20 June, 2013

Operation Logic Executor - Adding executors

    Last version of the locking mechanism provides easy way to control access to resources before task execution, but it is hard to lock resources at the moment of function invocation. To clear what I mean let’s look at example of 2 tasks executed asynchronously are handled by the same thread pool executor with only one thread.
  • Both tasks (T1, T2) goes to blocking queue.
  • ResourceLockAspect is locking resources of T1.
  • T1 is executed
  • ResourceLockAspect releases resources of T1.
  • ResourceLockAspect is locking resources of T2.
  • T2 is executed
  • ResourceLockAspect releases resources of T2.
If T1 and T2 operate on the same resource, user may want to perform locking at the moment of invocation, to prevent T2 from being added to queue. Adding new wrapping object, on which we will lock resources and then performing operation asynchronously will partially solve the problem. I used the idea of executor to implement this functionality. Executor will be in fact decorator of OperationLogic.
public interface OperationExecutor<T> extends OperationLogic<T> {
    void setOperationLogic(OperationLogic<T> logic);
    OperationLogic<T> getOperationLogic();
}
Here is executor’s implementation example. I set bean’s scope to prototype, but if its only property is OperationLogic object with singleton scope and this property doesn’t change, we could also use singleton scope for executor.
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TransactionProcessingExecutor implements OperationExecutor<Transaction> {

    private OperationLogic<Transaction> logic;

    @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker")
    public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException {
        return logic.executeOperation(data);
    }

    @Autowired
    @Qualifier(value = "transactionProcessingLogic")
    public void setOperationLogic(OperationLogic<Transaction> logic) {
        this.logic = logic;
    }

    public OperationLogic<Transaction> getOperationLogic() {
        return logic;
    }
}
When executeOperation is invoked resources are locked and then injected operation is performed. If that operation is asynchronous, the invocation will occur and resources will be immediately released. This is why I previously wrote that current executor implementation only partially solve the problem. What I’m going to do is defining AsynOperationLogic with special method operationFinished invoked by aspect when executeOperation method returns
public interface AsyncOperationLogic<T> extends OperationLogic<T> {
    
    public void operationFinished();
    
    public void setResourceReleaser(ResourceReleaser releaser);
    
}
It also contains setter method to define object that will take care of releasing resources.
@Aspect
@Component
public class ReleaseResourcesAspect {

    private Logger             logger = LogManager.getLogger(ReleaseLockAspect.class);

    @After("execution(* pl.mariusz.marciniak.operations.logic.AsyncOperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))")
    private void release(JoinPoint jp) {
        logger.debug("releasing resources");
        ((AsyncOperationLogic<?>)jp.getTarget()).operationFinished();
    }


}
Now I need to provide identifiers and locking mechanism that were used for locking to AsyncOperationLogic. Also I need to skip unlock in ResourceLockAspect. To achieve this I will extend @ReleaseLock annotation to indicate another bean that will take care of releasing resources
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ResourceLock {
    Class<? extends IdentifierBuilder<?>> identifierBuilder();
    Class<? extends ResourceLocker> resourceLocker();
    String resourceLockerBeanName();
    String releaseResourceBeanName() default "";
}
and change ResourceLockAspect.execute method
    @Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))"
           +" && @annotation(resourceLock)")
    private <T> Object execute(ProceedingJoinPoint pjp, ResourceLock resourceLock) throws Throwable {
        Object result = null;
        IdentifierBuilder<T> builder = resolveIdentifierBuilder(resourceLock);
        ResourceLocker locker = resolveResourceLocker(resourceLock);
        OperationData<T> operationData = (OperationData<T>) pjp.getArgs()[0];
        long[] objectsToLockIdentifiers = createIdentifiers(builder, operationData);// check if release resource bean is defined if yes initialize it
boolean releaseResourceDefined = resourceLock.releaseResourceBeanName().length()>0;
        initResourceReleaserOnAsyncOperationLogic(pjp.getTarget(), resourceLock.releaseResourceBeanName(), locker, objectsToLockIdentifiers);

        if (locker.lock(objectsToLockIdentifiers)) {
            try {
                result = pjp.proceed();
            } finally {
                if(!releaseResourceDefined) // release resource bean will take care of that
                    locker.unlock(objectsToLockIdentifiers);
            }
        } else {
            logger.info("Cannot perform operation " + operationData.getName() + ". Locked resources "
                            + new StrBuilder().appendWithSeparators(ArrayUtils.toObject(objectsToLockIdentifiers), ",").toString());
        }
        return result;
    }

    private void initResourceReleaserOnAsyncOperationLogic(Object target, String resourceReleaseBeanName, ResourceLocker locker, long[] objectsToLockIdentifiers) {
        AsyncOperationLogic<?> asyncOperationLogic = null;
        if (target instanceof AsyncOperationLogic<?>) {
            asyncOperationLogic = (AsyncOperationLogic<?>)target;
        } else if(target instanceof OperationExecutor<?>) {
            OperationExecutor<?> operationExecutor = (OperationExecutor<?>)target;
            if (operationExecutor.getOperationLogic() instanceof AsyncOperationLogic<?>) {
                asyncOperationLogic = (AsyncOperationLogic<?>)operationExecutor.getOperationLogic();
            }            
        }
        if(asyncOperationLogic != null) {
            ResourceReleaser releaser = appContext.getBean(ResourceReleaser.class);
            releaser.setResourceLocker(locker);
            releaser.setResourcesIdentifiers(objectsToLockIdentifiers);
            asyncOperationLogic.setResourceReleaser(releaser);
        }
    }
There are two changes comparing to prior version:
  • initResourceReleaserOnAsyncOperationLogic is added to initialize and set ResourceReleaser object
  • if resources releaser is defined the aspect doesn't unlock resources
Let's test this functionality by defining asynchronous logic:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class AsyncTransactionProcessingLogic implements AsyncOperationLogic<Transaction> {

    private Logger logger = LogManager.getLogger(AsyncTransactionProcessingLogic.class);

    @Autowired
    @Qualifier(value="transactionProcessingLogic")
    private OperationLogic<Transaction> logic;

    private ResourceReleaser releaser;

    @Async(value="singleOperationExecutor")
    public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException {
        return logic.executeOperation(data);
    }

    public void operationFinished() {
        logger.debug("operation finished");
        releaser.unlock();
    }
    
    public void setResourceReleaser(ResourceReleaser releaser) {
        this.releaser = releaser;
    }

}

and executor which will lock resources
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TransactionProcessingExecutor implements OperationExecutor<Transaction> {

    private OperationLogic<Transaction> logic;

    @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker")
    public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException {
        return logic.executeOperation(data);
    }

    @Autowired
    @Qualifier(value = "transactionProcessingLogic")
    public void setOperationLogic(OperationLogic<Transaction> logic) {
        this.logic = logic;
    }

    public OperationLogic<Transaction> getOperationLogic() {
        return logic;
    }
}
Executing 3 operations (2 trying to lock transaction 203002322 and 1 transaction 125593085) using pool with only one thread can produce an output:
[main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test with immediate lock
[main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [203002322]
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322
[main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [125593085]
[main] INFO  pl.mariusz.marciniak.locking.aop.ResourceLockAspect - Cannot perform operation complicated Buy operation on account. Locked resources 203002322
[main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [203002322]
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished
[singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [125593085]
Buy operation couldn't access locked resources, when it was invoked. So the functionality works as desired.

No comments:

Post a Comment