18 April, 2013

Operation Logic Executor - Asynchronous execution

   So let start with something really easy. I will add asynchronous execution to OperationLogic.executeOperation method.

Current declaration of this method doesn’t return any value, but throws OperationFailedException. It means that I cannot check what caused the failure, when operation was executed asynchronously. That’s why I chose to change executeOperation method to:
public interface OperationLogic<T> {
    Future<Boolean> executeOperation(OperationData<T> data) throws OperationFailedException;
}
Now I can check thrown exception using Future object. I also added generics to avoid casting. Before implementing first sample class of OperationLogic, I need to add some dependencies to my Maven project
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-core</artifactid>
    <version>3.2.2.RELEASE</version>
    <type>jar</type>
   </dependency>
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-context</artifactid>
    <version>3.2.2.RELEASE</version>
   </dependency>
   <dependency>
    <groupid>org.springframework</groupid>
    <artifactid>spring-aspects</artifactid>
    <version>3.2.2.RELEASE</version>
   </dependency>
   <dependency>
    <groupid>org.apache.logging.log4j</groupid>
    <artifactid>log4j-core</artifactid>
    <version>2.0-beta4</version>
    <type>jar</type>
   </dependency>

Okay I added core, context and aspects Spring jars required to use @Async and additionally log4j library for logging.
@Component
public class StringProcessingLogic implements OperationLogic<String> {

    private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); 
    
    public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException {
        boolean result = false;
        try {
            long startTime = System.currentTimeMillis();
            do {
                logger.info("processing " + data);
                Thread.sleep(500);
            } while (startTime + 10000 > System.currentTimeMillis());
            result = true;
        } catch (InterruptedException e) {
            logger.error(e);
        }
        return new AsyncResult<Boolean>(result);
    }

}
Now I only need to define packages which Spring should scan for beans
    <context:component-scan base-package="pl.mariusz.marciniak.async" />
and can test current solution. Here is code that I executed:
private void testAsyncString() throws OperationFailedException {
    logger.info("starting asynchronous test");
    OperationLogic<String> logic = (OperationLogic<String>)appContext.getBean("stringProcessingLogic",OperationLogic.class);
    OperationData<String> calculationData = new OperationData<String>("really long calculations");
    calculationData.setValueObject("calculation data");
    logic.executeOperation(calculationData);
    OperationData<String> transformationData = new OperationData<String>("transformation");
    transformationData.setValueObject("transformed data");
    logic.executeOperation(transformationData);
    logger.info("executing other logic ...");
}
If OperationData.toString method is overridden, you can get output similar to this one:
23:21:28.773 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
23:21:28.775 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:21:29.275 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
...
23:21:38.275 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:21:38.777 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:21:39.277 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
...
23:21:48.277 [main] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:21:48.777 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
What we have here is standard synchronous execution, to make it asynchronous I need to create ThreadPoolTaskExecutor. It can be easily achieved with Spring, by adding definition of ThreadPoolTaskExecutor into configuration file and attach this pool to be used by @Async annotation
    <task:executor id="operationExecutor" pool-size="5"/>
    <task:annotation-driven executor="operationExecutor" />
Then I annotate StringProcessingLogic.executeOperation
@Async
public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException {
...
}
and run my test one more time
23:46:26.887 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test
23:46:26.892 [main] INFO  pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...
23:46:26.892 [operationExecutor-2] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
23:46:26.892 [operationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:46:27.392 [operationExecutor-1] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data
23:46:27.392 [operationExecutor-2] INFO  pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data
...

As we can see executeOperation methods returned immediately, that is why we have “executing other logic…” right after “starting asynchronous test”. Also transformation and calculation operations work in parallel and are not executed by main thread anymore.
What if for some operation I want asynchronous execution, but not concurrent. Of course I can define pool with only one thread. So lets add another executor pool
    <task:executor id="singleOperationExecutor" pool-size="1"/>
Almost done. The problem is that @Async is still using my default annotations' executor. So I need to define executor explicitly in my not concurrent operation
@Async("singleOperationExecutor")

No comments:

Post a Comment