20 June, 2013
Operation Logic Executor - Source code
It will be the last post in this series. If you found anything useful please don't hesitate to leave a comment. You can download last version of source code from GitHub. It's a little bit different than in posts, but the idea stays the same.
Operation Logic Executor - Adding executors
Last version of the locking mechanism provides easy way to control access to resources before task execution, but it is hard to lock resources at the moment of function invocation. To clear what I mean let’s look at example of 2 tasks executed asynchronously are handled by the same thread pool executor with only one thread.
- Both tasks (T1, T2) goes to blocking queue.
ResourceLockAspect
is locking resources of T1.- T1 is executed
ResourceLockAspect
releases resources of T1.ResourceLockAspect
is locking resources of T2.- T2 is executed
ResourceLockAspect
releases resources of T2.
If T1 and T2 operate on the same resource, user may want to perform locking at the moment of invocation, to prevent T2 from being added to queue. Adding new wrapping object, on which we will lock resources and then performing operation asynchronously will partially solve the problem. I used the idea of executor to implement this functionality. Executor will be in fact decorator of
and executor which will lock resources
OperationLogic
.public interface OperationExecutor<T> extends OperationLogic<T> { void setOperationLogic(OperationLogic<T> logic); OperationLogic<T> getOperationLogic(); }Here is executor’s implementation example. I set bean’s scope to prototype, but if its only property is
OperationLogic
object with singleton scope and this property doesn’t change, we could also use singleton scope for executor.@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TransactionProcessingExecutor implements OperationExecutor<Transaction> { private OperationLogic<Transaction> logic; @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } @Autowired @Qualifier(value = "transactionProcessingLogic") public void setOperationLogic(OperationLogic<Transaction> logic) { this.logic = logic; } public OperationLogic<Transaction> getOperationLogic() { return logic; } }When
executeOperation
is invoked resources are locked and then injected operation is performed. If that operation is asynchronous, the invocation will occur and resources will be immediately released. This is why I previously wrote that current executor implementation only partially solve the problem. What I’m going to do is defining AsynOperationLogic
with special method operationFinished
invoked by aspect when executeOperation
method returnspublic interface AsyncOperationLogic<T> extends OperationLogic<T> { public void operationFinished(); public void setResourceReleaser(ResourceReleaser releaser); }It also contains setter method to define object that will take care of releasing resources.
@Aspect @Component public class ReleaseResourcesAspect { private Logger logger = LogManager.getLogger(ReleaseLockAspect.class); @After("execution(* pl.mariusz.marciniak.operations.logic.AsyncOperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))") private void release(JoinPoint jp) { logger.debug("releasing resources"); ((AsyncOperationLogic<?>)jp.getTarget()).operationFinished(); } }Now I need to provide identifiers and locking mechanism that were used for locking to
AsyncOperationLogic
. Also I need to skip unlock in ResourceLockAspect
. To achieve this I will extend @ReleaseLock
annotation to indicate another bean that will take care of releasing resources@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface ResourceLock { Class<? extends IdentifierBuilder<?>> identifierBuilder(); Class<? extends ResourceLocker> resourceLocker(); String resourceLockerBeanName(); String releaseResourceBeanName() default ""; }and change
ResourceLockAspect.execute
method@Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))" +" && @annotation(resourceLock)") private <T> Object execute(ProceedingJoinPoint pjp, ResourceLock resourceLock) throws Throwable { Object result = null; IdentifierBuilder<T> builder = resolveIdentifierBuilder(resourceLock); ResourceLocker locker = resolveResourceLocker(resourceLock); OperationData<T> operationData = (OperationData<T>) pjp.getArgs()[0]; long[] objectsToLockIdentifiers = createIdentifiers(builder, operationData);// check if release resource bean is defined if yes initialize it boolean releaseResourceDefined = resourceLock.releaseResourceBeanName().length()>0; initResourceReleaserOnAsyncOperationLogic(pjp.getTarget(), resourceLock.releaseResourceBeanName(), locker, objectsToLockIdentifiers); if (locker.lock(objectsToLockIdentifiers)) { try { result = pjp.proceed(); } finally { if(!releaseResourceDefined) // release resource bean will take care of that locker.unlock(objectsToLockIdentifiers); } } else { logger.info("Cannot perform operation " + operationData.getName() + ". Locked resources " + new StrBuilder().appendWithSeparators(ArrayUtils.toObject(objectsToLockIdentifiers), ",").toString()); } return result; } private void initResourceReleaserOnAsyncOperationLogic(Object target, String resourceReleaseBeanName, ResourceLocker locker, long[] objectsToLockIdentifiers) { AsyncOperationLogic<?> asyncOperationLogic = null; if (target instanceof AsyncOperationLogic<?>) { asyncOperationLogic = (AsyncOperationLogic<?>)target; } else if(target instanceof OperationExecutor<?>) { OperationExecutor<?> operationExecutor = (OperationExecutor<?>)target; if (operationExecutor.getOperationLogic() instanceof AsyncOperationLogic<?>) { asyncOperationLogic = (AsyncOperationLogic<?>)operationExecutor.getOperationLogic(); } } if(asyncOperationLogic != null) { ResourceReleaser releaser = appContext.getBean(ResourceReleaser.class); releaser.setResourceLocker(locker); releaser.setResourcesIdentifiers(objectsToLockIdentifiers); asyncOperationLogic.setResourceReleaser(releaser); } }There are two changes comparing to prior version:
initResourceReleaserOnAsyncOperationLogic
is added to initialize and setResourceReleaser
object- if resources releaser is defined the aspect doesn't unlock resources
@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class AsyncTransactionProcessingLogic implements AsyncOperationLogic<Transaction> { private Logger logger = LogManager.getLogger(AsyncTransactionProcessingLogic.class); @Autowired @Qualifier(value="transactionProcessingLogic") private OperationLogic<Transaction> logic; private ResourceReleaser releaser; @Async(value="singleOperationExecutor") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } public void operationFinished() { logger.debug("operation finished"); releaser.unlock(); } public void setResourceReleaser(ResourceReleaser releaser) { this.releaser = releaser; } }
and executor which will lock resources
@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TransactionProcessingExecutor implements OperationExecutor<Transaction> { private OperationLogic<Transaction> logic; @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } @Autowired @Qualifier(value = "transactionProcessingLogic") public void setOperationLogic(OperationLogic<Transaction> logic) { this.logic = logic; } public OperationLogic<Transaction> getOperationLogic() { return logic; } }Executing 3 operations (2 trying to lock transaction 203002322 and 1 transaction 125593085) using pool with only one thread can produce an output:
[main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test with immediate lock [main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [203002322] [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [125593085] [main] INFO pl.mariusz.marciniak.locking.aop.ResourceLockAspect - Cannot perform operation complicated Buy operation on account. Locked resources 203002322 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [203002322] [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [125593085]Buy operation couldn't access locked resources, when it was invoked. So the functionality works as desired.
Subscribe to:
Posts (Atom)