OperationLogic.executeOperation
method.Current declaration of this method doesn’t return any value, but throws
OperationFailedException
. It means that I cannot check what caused the failure, when operation was
executed asynchronously. That’s why I chose to change executeOperation
method
to:public interface OperationLogic<T> { Future<Boolean> executeOperation(OperationData<T> data) throws OperationFailedException; }Now I can check thrown exception using Future object. I also added generics to avoid casting. Before implementing first sample class of
OperationLogic
, I need to add some dependencies to my Maven project
<dependency> <groupid>org.springframework</groupid> <artifactid>spring-core</artifactid> <version>3.2.2.RELEASE</version> <type>jar</type> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>3.2.2.RELEASE</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-aspects</artifactid> <version>3.2.2.RELEASE</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-core</artifactid> <version>2.0-beta4</version> <type>jar</type> </dependency>
Okay I added core, context and aspects Spring jars required to use
@Async
and additionally log4j library for logging.
@Component public class StringProcessingLogic implements OperationLogic<String> { private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException { boolean result = false; try { long startTime = System.currentTimeMillis(); do { logger.info("processing " + data); Thread.sleep(500); } while (startTime + 10000 > System.currentTimeMillis()); result = true; } catch (InterruptedException e) { logger.error(e); } return new AsyncResult<Boolean>(result); } }Now I only need to define packages which Spring should scan for beans
<context:component-scan base-package="pl.mariusz.marciniak.async" />and can test current solution. Here is code that I executed:
private void testAsyncString() throws OperationFailedException { logger.info("starting asynchronous test"); OperationLogic<String> logic = (OperationLogic<String>)appContext.getBean("stringProcessingLogic",OperationLogic.class); OperationData<String> calculationData = new OperationData<String>("really long calculations"); calculationData.setValueObject("calculation data"); logic.executeOperation(calculationData); OperationData<String> transformationData = new OperationData<String>("transformation"); transformationData.setValueObject("transformed data"); logic.executeOperation(transformationData); logger.info("executing other logic ..."); }If
OperationData.toString
method is overridden, you can get output similar to this one:
23:21:28.773 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 23:21:28.775 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:21:29.275 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data ... 23:21:38.275 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:21:38.777 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:21:39.277 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data ... 23:21:48.277 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:21:48.777 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...What we have here is standard synchronous execution, to make it asynchronous I need to create
ThreadPoolTaskExecutor
. It can be easily achieved with Spring, by adding definition of ThreadPoolTaskExecutor
into configuration file and attach this pool to be used by @Async
annotation
<task:executor id="operationExecutor" pool-size="5"/> <task:annotation-driven executor="operationExecutor" />Then I annotate
StringProcessingLogic.executeOperation
@Async public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException { ... }and run my test one more time
23:46:26.887 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 23:46:26.892 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... 23:46:26.892 [operationExecutor-2] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:46:26.892 [operationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:46:27.392 [operationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:46:27.392 [operationExecutor-2] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data ...
As we can
see
executeOperation
methods returned immediately, that is why we
have “executing other logic…” right after “starting asynchronous test”. Also
transformation and calculation operations work in parallel and are not executed
by main thread anymore.
What if for
some operation I want asynchronous
execution, but not concurrent. Of course I can define pool with only one thread. So lets add another executor pool
<task:executor id="singleOperationExecutor" pool-size="1"/>Almost done. The problem is that
@Async
is still using my default annotations' executor. So I need to define executor explicitly in my not concurrent operation
@Async("singleOperationExecutor")
No comments:
Post a Comment