22 May, 2013

Operation Logic Executor - Implementation of locking mechanism

   In previous post, I proved that we can easily combine concepts of asynchronous execution and resource locking. Now I can implement locking mechanism. I will use simple idea of storing locked resources’ identifiers in set, but there are no limits to create more complex solutions. First of all @ResouceLock should be configurable and give user possibility to use any locking method. That is why I extended previous @ResourceLock annotation to:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ResourceLock {
    Class<? extends IdentifierBuilder<?>> identifierBuilder();
    Class<? extends ResourceLocker> resourceLocker();
    String resourceLockerBeanName();
}
I added resourceLocker and resourceLockerBeanName to fetch bean containing locking logic. User can also define identifierBuilder bean, which will create unique identifiers for resources. I didn’t add bean name for builder, because I don’t want programmers to customize builder logic, while for resourceLocker it can be useful (e.g. locker inserting blocked identifiers to tables, where programmer will only change query for the same type of bean).
public interface IdentifierBuilder<T> {
    long[] getIdentifiers(OperationData<T> operationLogicArgs);
}

I decided to return identifiers as an array of longs. It means that programmer needs to take care of mapping different type keys or complex keys to long values. I’m aware that such a limitation can be inconvenient and error-prone, but for purpose of this example I will leave it as it is.
public interface ResourceLocker {
    boolean lock(long... identifiers);
    boolean unlock(long... identifiers);
    boolean isLocked(long identifier);
}
Here also it will be much more better to throw exception than return boolean value, which doesn’t give any information what went wrong.
Before implementing interfaces, lets present resource class.
public class Transaction {
    private long   accountNumber;
    private String transactionType;

    public long getAccountNumber() {
        return accountNumber;
    }
    public void setAccountNumber(long accountNumber) {
        this.accountNumber = accountNumber;
    }
    public String getTransactionType() {
        return transactionType;
    }
    public void setTransactionType(String transactionType) {
        this.transactionType = transactionType;
    }
}
For objects of this type I define operation similar to one used for strings processing from prior post:
@Component
public class TransactionProcessingLogic implements OperationLogic<Transaction> {

    private Logger logger = LogManager.getLogger(TransactionProcessingLogic.class);

    @ResourceLock(
            identifierBuilder=TransactionIdentifierBuilder.class,
            resourceLocker=TransactionLocker.class,
            resourceLockerBeanName="transactionLocker"
    )
    @Async
    public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException {
        boolean result = false;
        try {
            Object obj = data.getValueObject();
            if (obj instanceof Transaction) {
                Transaction transaction = (Transaction) obj;
                long startTime = System.currentTimeMillis();
                do {
                    logger.debug("processing " + transaction.getTransactionType() + " operation on account number: " + transaction.getAccountNumber());
                    Thread.sleep(500);
                } while (startTime + 10000 > System.currentTimeMillis());
                result = true;
            }
        } catch(InterruptedException e) {
            logger.error(e);
        }
        return new AsyncResult<Boolean>(result);
    }

}
Only transactions for different accounts can be operated simultaneously. So identifier builder can provide account number.
@Component
public class TransactionIdentifierBuilder implements IdentifierBuilder<Transaction> {
    public long[] getIdentifiers(OperationData<Transaction> operationData) {
        return new long[] {operationData.getValueObject().getAccountNumber()};
    }
}
Having identifiers I can implement ResourceLocker.
@Component
public class TransactionLocker implements ResourceLocker {

    private final Logger logger = LogManager.getLogger(TransactionLocker.class);

    private Set<Long>    lockedResources;

    @PostConstruct
    private void init() {
        lockedResources = new HashSet<Long>();
    }

    public boolean lock(long... identifiers) {
        Set<Long> resourcesIdentifiers = new HashSet<Long>();
        synchronized (lockedResources) {
            for (long identifier : identifiers) {
                if (lockedResources.contains(identifier)) {
                    return false;
                }
                resourcesIdentifiers.add(identifier);
            }
            logger.debug("acquiring lock on " + resourcesIdentifiers);
            lockedResources.addAll(resourcesIdentifiers);
        }
        return true;
    }

    public boolean unlock(long... identifiers) {
        Set<Long> resourcesIdentifiers = new HashSet<Long>();
        synchronized (lockedResources) {
            for (long identifier : identifiers) {
                if (lockedResources.contains(identifier)) {
                    resourcesIdentifiers.add(identifier);
                }
            }
            logger.debug("releasing lock on " + resourcesIdentifiers);
            lockedResources.removeAll(resourcesIdentifiers);
        }
        return true;
    }

    public boolean isLocked(long identifier) {
        return lockedResources.contains(identifier);
    }

}
As I previously wrote, this simple implementation bases on storing objects identifiers in set. I moved logging that was initially in aspect into this class. In fact there are many more changes in aspect code.
@Aspect
@Component
public class ResourceLockAspect {

    private Logger             logger = LogManager.getLogger(ResourceLockAspect.class);

    @Autowired
    private ApplicationContext appContext;

    @Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))"
           +" && @annotation(pl.mariusz.marciniak.locking.annotations.ResourceLock)")
    private <T> Object execute(ProceedingJoinPoint pjp) throws Throwable {
        Object result = null;
        ResourceLock resourceLock = fetchAnnotationData(pjp);
        IdentifierBuilder<T> builder = resolveIdentifierBuilder(resourceLock);
        ResourceLocker locker = resolveResourceLocker(resourceLock);
        OperationData<T> operationData = (OperationData<T>) pjp.getArgs()[0];
        long[] objectsToLockIdentifiers = createIdentifiers(builder, operationData);
        if (locker.lock(objectsToLockIdentifiers)) {
            result = pjp.proceed();
            locker.unlock(objectsToLockIdentifiers);
        } else {
            logger.info("Cannot perform operation " + operationData.getName() + ". Locked resources "
                            + new StrBuilder().appendWithSeparators(ArrayUtils.toObject(objectsToLockIdentifiers), ",").toString());
        }
        return result;
    }

    private <T> long[] createIdentifiers(IdentifierBuilder<T> builder, OperationData<T> operationData) {
        return builder.getIdentifiers(operationData);
    }

    private <T> IdentifierBuilder<T> resolveIdentifierBuilder(ResourceLock annotation) {
        Class<? extends IdentifierBuilder<T>> identifierBuilder = (Class<? extends IdentifierBuilder<T>>) annotation.identifierBuilder();
        return appContext.getBean(identifierBuilder);
    }

    private ResourceLocker resolveResourceLocker(ResourceLock annotation) {
        Class<? extends ResourceLocker> resourceLockerClass = annotation.resourceLocker();
        String resourceLockerBeanName = annotation.resourceLockerBeanName();
        return appContext.getBean(resourceLockerBeanName, resourceLockerClass);
    }

    private ResourceLock fetchAnnotationData(ProceedingJoinPoint pjp) throws NoSuchMethodException {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        String methodName = signature.getMethod().getName();
        Class<?>[] parameterTypes = signature.getMethod().getParameterTypes();
        ResourceLock annotation = pjp.getTarget().getClass().getMethod(methodName, parameterTypes).getAnnotation(ResourceLock.class);
        return annotation;
    }
}

First of all I changed a pointcut of my aspect. Previously advice was executed for all methods annotated with @ResourceLock. Currently it is required that method must be named executeOperation, take pl.mariusz.marciniak.operations.data.OperationData parameter and belong to object implementing pl.mariusz.marciniak.operations.logic.OperationLogic. In line 14 I fetch annotation from method declaration and then use it to retrieve builder and locker beans. Using builder bean I can change objects stored in operationData argument into array of identifiers. These are passed to locking mechanism.
Methods for fetching beans and creating identifiers are straightforward, so I won’t describe them. More interesting is operation of annotation fetching. I get method name and parameters from join point and then find proper method, which provides also information about annotation. In fact there is a easier way to get annotation data of join point in Spring.
    @Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))"
           +" && @annotation(resourceLock)")
    private <T> Object execute(ProceedingJoinPoint pjp, ResourceLock resourceLock) throws Throwable {

In this case annotation is added to method parameters and additionally pointcut definition is changed to point to this additional parameter @annotation(resourceLock).

Let’s check if locking mechanism works correctly. I used executor with pool size equal 5 and data:
    private void testAsyncTransaction() throws OperationFailedException {
        logger.info("starting asynchronous test");
        OperationLogic<Transaction> logic = appContext.getBean("transactionProcessingLogic",OperationLogic.class);
        OperationData<Transaction> sellTransactionDataForAccount1 = prepareOperationDataForTransaction(prepareTransaction(SELL_TRANSACTION, ACCOUNT_1));
        OperationData<Transaction> sellTransactionDataForAccount2 = prepareOperationDataForTransaction(prepareTransaction(SELL_TRANSACTION, ACCOUNT_2));
        OperationData<Transaction> buyTransactionDataForAccount1 = prepareOperationDataForTransaction(prepareTransaction(BUY_TRANSACTION, ACCOUNT_1));
        logic.executeOperation(sellTransactionDataForAccount1);
        logic.executeOperation(sellTransactionDataForAccount2);
        logic.executeOperation(buyTransactionDataForAccount1);
        logger.info("executing other logic ...");
    }

    private OperationData<Transaction> prepareOperationDataForTransaction(Transaction transaction) {
        OperationData<Transaction> transactionData = new TransactionData("complicated "+transaction.getTransactionType()+" operation on account");
        transactionData.setValueObject(transaction);
        return transactionData;
    }
    private Transaction prepareTransaction(String transactionType, long accountNumber) {
        Transaction transaction = new Transaction();
        transaction.setAccountNumber(accountNumber);
        transaction.setTransactionType(transactionType);
        return transaction;
    }

Possible output is:
22:36:29.954 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
22:36:29.960 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
22:36:29.968 [operationExecutor-3] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [203002322]
22:36:29.968 [operationExecutor-2] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [125593085]
22:36:29.968 [operationExecutor-3] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Buy operation on account number: 203002322
22:36:29.968 [operationExecutor-2] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Sell operation on account number: 125593085
22:36:29.980 [operationExecutor-1] INFO  pl.mariusz.marciniak.locking.aop.ResourceLockAspect - Cannot perform operation complicated Sell operation on account. Locked resources 203002322
22:36:30.468 [operationExecutor-3] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Buy operation on account number: 203002322
22:36:30.469 [operationExecutor-2] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Sell operation on account number: 125593085
...
Sell operation at line 7 was blocked as expected.