28 April, 2013

Operation Logic Executor - Combine fake locking with asynchronous execution

   Combining both solutions is trivial since I used aspects for them. I need to add @ResourceLock to my asynchronous method.
@Component
public class StringProcessingLogic implements OperationLogic<string> {

    private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); 

    @ResourceLock
    @Async("singleOperationExecutor")
    public Future<boolean> executeOperation(OperationData<string> data) throws OperationFailedException {
        boolean result = false;
        try {
            long startTime = System.currentTimeMillis();
            do {
                logger.info("processing " + data);
                Thread.sleep(500);
            } while (startTime + 10000 > System.currentTimeMillis());
            result = true;
        } catch (InterruptedException e) {
            logger.error(e);
        }
        return new AsyncResult<boolean>(result);
    }

}
Let's check the result
20:37:10.603 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
20:37:10.629 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
20:37:10.635 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on really long calculations::calculation data
20:37:10.635 [singleOperationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
...
20:37:20.137 [singleOperationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
20:37:20.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on really long calculations::calculation data
20:37:20.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on transformation::transformed data
20:37:20.638 [singleOperationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
...
20:37:30.138 [singleOperationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
20:37:30.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on transformation::transformed data

I’ve used executor with single thread so the result is as desired. The most important thing is that asynchronous execution takes precedence before locking. That is very good, in other way resources will be locked, method invoked asynchronously will immediately  returned and resources released before finishing asynchronous execution. So that’s REALLY great. Nice. Yeah, but why?

What exactly is defining order of different aspects invocation at the same join point? Well Spring documentation states:
When two pieces of advice defined in different aspects both need to run at the same join point, unless you specify otherwise the order of execution is undefined. You can control the order of execution by specifying precedence. This is done in the normal Spring way by either implementing the org.springframework.core.Ordered interface in the aspect class or annotating it with the Order annotation. Given two aspects, the aspect returning the lower value from Ordered.getValue() (or the annotation value) has the higher precedence.
For test purpose I will change my locking annotation order to be executed before @Async
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Order(Ordered.HIGHEST_PRECEDENCE)
public @interface ResourceLock {
}
In fact setting @ResourceLock to have the highest precedence (@Order equal the lowest integer), doesn’t change application’s behaviour. This is because AsyncAnnotationBeanPostProcessor has set beforeExistingAdvisors property to true and always adds @Async advistor before others. According to Spring creators:
@Async always needs to be the first Advisor in the chain in order to provide meaningful around-invocation semantics 
It means that Spring provides correct ordering and I don't need to change anything in my code.

Operation Logic Executor - Spring Aspects

   I want to base my locking mechanism on Spring Aspects. So first lets shortly introduce simple example of using AOP. This aspect will be executed for all methods annotated with ResourceLock annotation.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ResourceLock {
}
Next preparation step is to add spring-aspects artifact to dependencies from previous post.
   <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aspects</artifactId>
    <version>3.2.2.RELEASE</version>
   </dependency>
Now I can implement aspect.
@Aspect
@Component
public class ResourceLockAspect {

    private Logger logger = LogManager.getLogger(ResourceLockAspect.class);

    @Around("@annotation(pl.mariusz.marciniak.locking.annotations.ResourceLock)")
    private Object execute(ProceedingJoinPoint pjp) throws Throwable {
        logger.debug("acquiring lock on "+ pjp.getArgs()[0]);
        pjp.proceed();
        logger.debug("releasing lock on "+ pjp.getArgs()[0]);
        return null;
    }
}
Around advice type means that advice surrounds method invocation. I will need that to lock resources and then release them after method execution is finished. As defined before pointcut expression will invoke advice for each method annotated with ResourceLock. So lets define bean with such a method.
@Component
public class ObjectUsingFakeLocking {

    private final Logger logger = LogManager.getLogger(ObjectUsingFakeLocking.class);

    public void executeNormalMethod(String parameter) {
        logger.info("executeNormalMethod - " + parameter);
    }

    @ResourceLock
    public void executeMethodLockingResources(String parameter) {
        logger.info("executeMethodLockingResources - " + parameter);

    }
}
Execution of both ObjectUsingFakeLocking methods
    private void testFakeLocking() {
        ObjectUsingFakeLocking obj = appContext.getBean(ObjectUsingFakeLocking.class);
        String param = "parameter";
        obj.executeNormalMethod(param);
        obj.executeMethodLockingResources(param);
        
    }
prints
19:40:34.192 [main] INFO  pl.mariusz.marciniak.locking.ObjectUsingFakeLocking - executeNormalMethod - parameter
19:40:34.198 [main] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on parameter
19:40:34.198 [main] INFO  pl.mariusz.marciniak.locking.ObjectUsingFakeLocking - executeMethodLockingResources - parameter
19:40:34.198 [main] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on parameter
Okay, so my aspects work as desired. Next step is to combine it with asynchronous execution.

18 April, 2013

Operation Logic Executor - Asynchronous execution

   So let start with something really easy. I will add asynchronous execution to OperationLogic.executeOperation method.

Current declaration of this method doesn’t return any value, but throws OperationFailedException. It means that I cannot check what caused the failure, when operation was executed asynchronously. That’s why I chose to change executeOperation method to:
public interface OperationLogic<T> {
    Future<Boolean> executeOperation(OperationData<T> data) throws OperationFailedException;
}
Now I can check thrown exception using Future object. I also added generics to avoid casting. Before implementing first sample class of OperationLogic, I need to add some dependencies to my Maven project
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-core</artifactid>
    <version>3.2.2.RELEASE</version>
    <type>jar</type>
   </dependency>
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-context</artifactid>
    <version>3.2.2.RELEASE</version>
   </dependency>
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-aspects</artifactid>
    <version>3.2.2.RELEASE</version>
   </dependency>
   <dependency>
    <groupid>org.apache.logging.log4j</groupid>
    <artifactid>log4j-core</artifactid>
    <version>2.0-beta4</version>
    <type>jar</type>
   </dependency>

Okay I added core, context and aspects Spring jars required to use @Async and additionally log4j library for logging.
@Component
public class StringProcessingLogic implements OperationLogic<String> {

    private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); 
    
    public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException {
        boolean result = false;
        try {
            long startTime = System.currentTimeMillis();
            do {
                logger.info("processing " + data);
                Thread.sleep(500);
            } while (startTime + 10000 > System.currentTimeMillis());
            result = true;
        } catch (InterruptedException e) {
            logger.error(e);
        }
        return new AsyncResult<Boolean>(result);
    }

}
Now I only need to define packages which Spring should scan for beans
    <context:component-scan base-package="pl.mariusz.marciniak.async" />
and can test current solution. Here is code that I executed:
private void testAsyncString() throws OperationFailedException {
    logger.info("starting asynchronous test");
    OperationLogic<String> logic = (OperationLogic<String>)appContext.getBean("stringProcessingLogic",OperationLogic.class);
    OperationData<String> calculationData = new OperationData<String>("really long calculations");
    calculationData.setValueObject("calculation data");
    logic.executeOperation(calculationData);
    OperationData<String> transformationData = new OperationData<String>("transformation");
    transformationData.setValueObject("transformed data");
    logic.executeOperation(transformationData);
    logger.info("executing other logic ...");
}
If OperationData.toString method is overridden, you can get output similar to this one:
23:21:28.773 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
23:21:28.775 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:21:29.275 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
...
23:21:38.275 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:21:38.777 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:21:39.277 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
...
23:21:48.277 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:21:48.777 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
What we have here is standard synchronous execution, to make it asynchronous I need to create ThreadPoolTaskExecutor. It can be easily achieved with Spring, by adding definition of ThreadPoolTaskExecutor into configuration file and attach this pool to be used by @Async annotation
    <task:executor id="operationExecutor" pool-size="5"/>
    <task:annotation-driven executor="operationExecutor" />
Then I annotate StringProcessingLogic.executeOperation
@Async
public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException {
...
}
and run my test one more time
23:46:26.887 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
23:46:26.892 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
23:46:26.892 [operationExecutor-2] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:46:26.892 [operationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:46:27.392 [operationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:46:27.392 [operationExecutor-2] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
...

As we can see executeOperation methods returned immediately, that is why we have “executing other logic…” right after “starting asynchronous test”. Also transformation and calculation operations work in parallel and are not executed by main thread anymore.
What if for some operation I want asynchronous execution, but not concurrent. Of course I can define pool with only one thread. So lets add another executor pool
    <task:executor id="singleOperationExecutor" pool-size="1"/>
Almost done. The problem is that @Async is still using my default annotations' executor. So I need to define executor explicitly in my not concurrent operation
@Async("singleOperationExecutor")

17 April, 2013

Operation Logic Executor - Assignment Description

  "Operation Logic Executor" will be series of posts related to task, for which I need to prepare proof of concept.

Assignment Details 
System contains various operations defined in classes implementing OperationLogic interface. According to provided data, it chooses correct operation and executes it. Currently all executions are synchronous. I need to add asynchronous execution and configurable locking mechanism, using Spring Framework. Declaration of OperationLogic can be changed.
public interface OperationLogic {
    void executeOperation(OperationData data) throws OperationFailedException;
}
Lets start...