Showing posts with label asynchronous. Show all posts
Showing posts with label asynchronous. Show all posts
20 June, 2013
Operation Logic Executor - Source code
It will be the last post in this series. If you found anything useful please don't hesitate to leave a comment. You can download last version of source code from GitHub. It's a little bit different than in posts, but the idea stays the same.
Operation Logic Executor - Adding executors
Last version of the locking mechanism provides easy way to control access to resources before task execution, but it is hard to lock resources at the moment of function invocation. To clear what I mean let’s look at example of 2 tasks executed asynchronously are handled by the same thread pool executor with only one thread.
- Both tasks (T1, T2) goes to blocking queue.
ResourceLockAspect
is locking resources of T1.- T1 is executed
ResourceLockAspect
releases resources of T1.ResourceLockAspect
is locking resources of T2.- T2 is executed
ResourceLockAspect
releases resources of T2.
If T1 and T2 operate on the same resource, user may want to perform locking at the moment of invocation, to prevent T2 from being added to queue. Adding new wrapping object, on which we will lock resources and then performing operation asynchronously will partially solve the problem. I used the idea of executor to implement this functionality. Executor will be in fact decorator of
and executor which will lock resources
OperationLogic
.public interface OperationExecutor<T> extends OperationLogic<T> { void setOperationLogic(OperationLogic<T> logic); OperationLogic<T> getOperationLogic(); }Here is executor’s implementation example. I set bean’s scope to prototype, but if its only property is
OperationLogic
object with singleton scope and this property doesn’t change, we could also use singleton scope for executor.@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TransactionProcessingExecutor implements OperationExecutor<Transaction> { private OperationLogic<Transaction> logic; @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } @Autowired @Qualifier(value = "transactionProcessingLogic") public void setOperationLogic(OperationLogic<Transaction> logic) { this.logic = logic; } public OperationLogic<Transaction> getOperationLogic() { return logic; } }When
executeOperation
is invoked resources are locked and then injected operation is performed. If that operation is asynchronous, the invocation will occur and resources will be immediately released. This is why I previously wrote that current executor implementation only partially solve the problem. What I’m going to do is defining AsynOperationLogic
with special method operationFinished
invoked by aspect when executeOperation
method returnspublic interface AsyncOperationLogic<T> extends OperationLogic<T> { public void operationFinished(); public void setResourceReleaser(ResourceReleaser releaser); }It also contains setter method to define object that will take care of releasing resources.
@Aspect @Component public class ReleaseResourcesAspect { private Logger logger = LogManager.getLogger(ReleaseLockAspect.class); @After("execution(* pl.mariusz.marciniak.operations.logic.AsyncOperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))") private void release(JoinPoint jp) { logger.debug("releasing resources"); ((AsyncOperationLogic<?>)jp.getTarget()).operationFinished(); } }Now I need to provide identifiers and locking mechanism that were used for locking to
AsyncOperationLogic
. Also I need to skip unlock in ResourceLockAspect
. To achieve this I will extend @ReleaseLock
annotation to indicate another bean that will take care of releasing resources@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface ResourceLock { Class<? extends IdentifierBuilder<?>> identifierBuilder(); Class<? extends ResourceLocker> resourceLocker(); String resourceLockerBeanName(); String releaseResourceBeanName() default ""; }and change
ResourceLockAspect.execute
method@Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))" +" && @annotation(resourceLock)") private <T> Object execute(ProceedingJoinPoint pjp, ResourceLock resourceLock) throws Throwable { Object result = null; IdentifierBuilder<T> builder = resolveIdentifierBuilder(resourceLock); ResourceLocker locker = resolveResourceLocker(resourceLock); OperationData<T> operationData = (OperationData<T>) pjp.getArgs()[0]; long[] objectsToLockIdentifiers = createIdentifiers(builder, operationData);// check if release resource bean is defined if yes initialize it boolean releaseResourceDefined = resourceLock.releaseResourceBeanName().length()>0; initResourceReleaserOnAsyncOperationLogic(pjp.getTarget(), resourceLock.releaseResourceBeanName(), locker, objectsToLockIdentifiers); if (locker.lock(objectsToLockIdentifiers)) { try { result = pjp.proceed(); } finally { if(!releaseResourceDefined) // release resource bean will take care of that locker.unlock(objectsToLockIdentifiers); } } else { logger.info("Cannot perform operation " + operationData.getName() + ". Locked resources " + new StrBuilder().appendWithSeparators(ArrayUtils.toObject(objectsToLockIdentifiers), ",").toString()); } return result; } private void initResourceReleaserOnAsyncOperationLogic(Object target, String resourceReleaseBeanName, ResourceLocker locker, long[] objectsToLockIdentifiers) { AsyncOperationLogic<?> asyncOperationLogic = null; if (target instanceof AsyncOperationLogic<?>) { asyncOperationLogic = (AsyncOperationLogic<?>)target; } else if(target instanceof OperationExecutor<?>) { OperationExecutor<?> operationExecutor = (OperationExecutor<?>)target; if (operationExecutor.getOperationLogic() instanceof AsyncOperationLogic<?>) { asyncOperationLogic = (AsyncOperationLogic<?>)operationExecutor.getOperationLogic(); } } if(asyncOperationLogic != null) { ResourceReleaser releaser = appContext.getBean(ResourceReleaser.class); releaser.setResourceLocker(locker); releaser.setResourcesIdentifiers(objectsToLockIdentifiers); asyncOperationLogic.setResourceReleaser(releaser); } }There are two changes comparing to prior version:
initResourceReleaserOnAsyncOperationLogic
is added to initialize and setResourceReleaser
object- if resources releaser is defined the aspect doesn't unlock resources
@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class AsyncTransactionProcessingLogic implements AsyncOperationLogic<Transaction> { private Logger logger = LogManager.getLogger(AsyncTransactionProcessingLogic.class); @Autowired @Qualifier(value="transactionProcessingLogic") private OperationLogic<Transaction> logic; private ResourceReleaser releaser; @Async(value="singleOperationExecutor") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } public void operationFinished() { logger.debug("operation finished"); releaser.unlock(); } public void setResourceReleaser(ResourceReleaser releaser) { this.releaser = releaser; } }
and executor which will lock resources
@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TransactionProcessingExecutor implements OperationExecutor<Transaction> { private OperationLogic<Transaction> logic; @ResourceLock(identifierBuilder = TransactionIdentifierBuilder.class, resourceLocker = TransactionLocker.class, resourceLockerBeanName = "transactionLocker") public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { return logic.executeOperation(data); } @Autowired @Qualifier(value = "transactionProcessingLogic") public void setOperationLogic(OperationLogic<Transaction> logic) { this.logic = logic; } public OperationLogic<Transaction> getOperationLogic() { return logic; } }Executing 3 operations (2 trying to lock transaction 203002322 and 1 transaction 125593085) using pool with only one thread can produce an output:
[main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test with immediate lock [main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [203002322] [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [main] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [125593085] [main] INFO pl.mariusz.marciniak.locking.aop.ResourceLockAspect - Cannot perform operation complicated Buy operation on account. Locked resources 203002322 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 203002322 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [203002322] [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.TransactionProcessingLogic - processing Sell operation on account number: 125593085 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ReleaseResourcesAspect - releasing resources [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.operations.logic.impl.AsyncTransactionProcessingLogic - operation finished [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - releasing lock on [125593085]Buy operation couldn't access locked resources, when it was invoked. So the functionality works as desired.
22 May, 2013
Operation Logic Executor - Implementation of locking mechanism
In previous post, I proved that we can easily combine concepts of asynchronous execution and resource locking. Now I can implement locking mechanism. I will use simple idea of storing locked resources’ identifiers in set, but there are no limits to create more complex solutions. First of all
Before implementing interfaces, lets present resource class.
First of all I changed a pointcut of my aspect. Previously advice was executed for all methods annotated with
Methods for fetching beans and creating identifiers are straightforward, so I won’t describe them. More interesting is operation of annotation fetching. I get method name and parameters from join point and then find proper method, which provides also information about annotation. In fact there is a easier way to get annotation data of join point in Spring.
Let’s check if locking mechanism works correctly. I used executor with pool size equal 5 and data:
Possible output is:
@ResouceLock
should be configurable and give user possibility to use any locking method. That is why I extended previous @ResourceLock
annotation to:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface ResourceLock { Class<? extends IdentifierBuilder<?>> identifierBuilder(); Class<? extends ResourceLocker> resourceLocker(); String resourceLockerBeanName(); }I added
resourceLocker
and resourceLockerBeanName
to fetch bean containing locking logic. User can also define identifierBuilder
bean, which will create unique identifiers for resources. I didn’t add bean name for builder, because I don’t want programmers to customize builder logic, while for resourceLocker
it can be useful (e.g. locker inserting blocked identifiers to tables, where programmer will only change query for the same type of bean).public interface IdentifierBuilder<T> { long[] getIdentifiers(OperationData<T> operationLogicArgs); }I decided to return identifiers as an array of longs. It means that programmer needs to take care of mapping different type keys or complex keys to long values. I’m aware that such a limitation can be inconvenient and error-prone, but for purpose of this example I will leave it as it is.
public interface ResourceLocker { boolean lock(long... identifiers); boolean unlock(long... identifiers); boolean isLocked(long identifier); }Here also it will be much more better to throw exception than return boolean value, which doesn’t give any information what went wrong.
Before implementing interfaces, lets present resource class.
public class Transaction { private long accountNumber; private String transactionType; public long getAccountNumber() { return accountNumber; } public void setAccountNumber(long accountNumber) { this.accountNumber = accountNumber; } public String getTransactionType() { return transactionType; } public void setTransactionType(String transactionType) { this.transactionType = transactionType; } }For objects of this type I define operation similar to one used for strings processing from prior post:
@Component public class TransactionProcessingLogic implements OperationLogic<Transaction> { private Logger logger = LogManager.getLogger(TransactionProcessingLogic.class); @ResourceLock( identifierBuilder=TransactionIdentifierBuilder.class, resourceLocker=TransactionLocker.class, resourceLockerBeanName="transactionLocker" ) @Async public Future<Boolean> executeOperation(OperationData<Transaction> data) throws OperationFailedException { boolean result = false; try { Object obj = data.getValueObject(); if (obj instanceof Transaction) { Transaction transaction = (Transaction) obj; long startTime = System.currentTimeMillis(); do { logger.debug("processing " + transaction.getTransactionType() + " operation on account number: " + transaction.getAccountNumber()); Thread.sleep(500); } while (startTime + 10000 > System.currentTimeMillis()); result = true; } } catch(InterruptedException e) { logger.error(e); } return new AsyncResult<Boolean>(result); } }Only transactions for different accounts can be operated simultaneously. So identifier builder can provide account number.
@Component public class TransactionIdentifierBuilder implements IdentifierBuilder<Transaction> { public long[] getIdentifiers(OperationData<Transaction> operationData) { return new long[] {operationData.getValueObject().getAccountNumber()}; } }Having identifiers I can implement ResourceLocker.
@Component public class TransactionLocker implements ResourceLocker { private final Logger logger = LogManager.getLogger(TransactionLocker.class); private Set<Long> lockedResources; @PostConstruct private void init() { lockedResources = new HashSet<Long>(); } public boolean lock(long... identifiers) { Set<Long> resourcesIdentifiers = new HashSet<Long>(); synchronized (lockedResources) { for (long identifier : identifiers) { if (lockedResources.contains(identifier)) { return false; } resourcesIdentifiers.add(identifier); } logger.debug("acquiring lock on " + resourcesIdentifiers); lockedResources.addAll(resourcesIdentifiers); } return true; } public boolean unlock(long... identifiers) { Set<Long> resourcesIdentifiers = new HashSet<Long>(); synchronized (lockedResources) { for (long identifier : identifiers) { if (lockedResources.contains(identifier)) { resourcesIdentifiers.add(identifier); } } logger.debug("releasing lock on " + resourcesIdentifiers); lockedResources.removeAll(resourcesIdentifiers); } return true; } public boolean isLocked(long identifier) { return lockedResources.contains(identifier); } }As I previously wrote, this simple implementation bases on storing objects identifiers in set. I moved logging that was initially in aspect into this class. In fact there are many more changes in aspect code.
@Aspect @Component public class ResourceLockAspect { private Logger logger = LogManager.getLogger(ResourceLockAspect.class); @Autowired private ApplicationContext appContext; @Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))" +" && @annotation(pl.mariusz.marciniak.locking.annotations.ResourceLock)") private <T> Object execute(ProceedingJoinPoint pjp) throws Throwable { Object result = null; ResourceLock resourceLock = fetchAnnotationData(pjp); IdentifierBuilder<T> builder = resolveIdentifierBuilder(resourceLock); ResourceLocker locker = resolveResourceLocker(resourceLock); OperationData<T> operationData = (OperationData<T>) pjp.getArgs()[0]; long[] objectsToLockIdentifiers = createIdentifiers(builder, operationData); if (locker.lock(objectsToLockIdentifiers)) { result = pjp.proceed(); locker.unlock(objectsToLockIdentifiers); } else { logger.info("Cannot perform operation " + operationData.getName() + ". Locked resources " + new StrBuilder().appendWithSeparators(ArrayUtils.toObject(objectsToLockIdentifiers), ",").toString()); } return result; } private <T> long[] createIdentifiers(IdentifierBuilder<T> builder, OperationData<T> operationData) { return builder.getIdentifiers(operationData); } private <T> IdentifierBuilder<T> resolveIdentifierBuilder(ResourceLock annotation) { Class<? extends IdentifierBuilder<T>> identifierBuilder = (Class<? extends IdentifierBuilder<T>>) annotation.identifierBuilder(); return appContext.getBean(identifierBuilder); } private ResourceLocker resolveResourceLocker(ResourceLock annotation) { Class<? extends ResourceLocker> resourceLockerClass = annotation.resourceLocker(); String resourceLockerBeanName = annotation.resourceLockerBeanName(); return appContext.getBean(resourceLockerBeanName, resourceLockerClass); } private ResourceLock fetchAnnotationData(ProceedingJoinPoint pjp) throws NoSuchMethodException { MethodSignature signature = (MethodSignature) pjp.getSignature(); String methodName = signature.getMethod().getName(); Class<?>[] parameterTypes = signature.getMethod().getParameterTypes(); ResourceLock annotation = pjp.getTarget().getClass().getMethod(methodName, parameterTypes).getAnnotation(ResourceLock.class); return annotation; } }
First of all I changed a pointcut of my aspect. Previously advice was executed for all methods annotated with
@ResourceLock
. Currently it is required that method must be named executeOperation
, take pl.mariusz.marciniak.operations.data.OperationData
parameter and belong to object implementing pl.mariusz.marciniak.operations.logic.OperationLogic
. In line 14 I fetch annotation from method declaration and then use it to retrieve builder and locker beans. Using builder bean I can change objects stored in operationData argument into array of identifiers. These are passed to locking mechanism.Methods for fetching beans and creating identifiers are straightforward, so I won’t describe them. More interesting is operation of annotation fetching. I get method name and parameters from join point and then find proper method, which provides also information about annotation. In fact there is a easier way to get annotation data of join point in Spring.
@Around("execution(* pl.mariusz.marciniak.operations.logic.OperationLogic.executeOperation(pl.mariusz.marciniak.operations.data.OperationData))" +" && @annotation(resourceLock)") private <T> Object execute(ProceedingJoinPoint pjp, ResourceLock resourceLock) throws Throwable {In this case annotation is added to method parameters and additionally pointcut definition is changed to point to this additional parameter
@annotation(resourceLock)
.Let’s check if locking mechanism works correctly. I used executor with pool size equal 5 and data:
private void testAsyncTransaction() throws OperationFailedException { logger.info("starting asynchronous test"); OperationLogic<Transaction> logic = appContext.getBean("transactionProcessingLogic",OperationLogic.class); OperationData<Transaction> sellTransactionDataForAccount1 = prepareOperationDataForTransaction(prepareTransaction(SELL_TRANSACTION, ACCOUNT_1)); OperationData<Transaction> sellTransactionDataForAccount2 = prepareOperationDataForTransaction(prepareTransaction(SELL_TRANSACTION, ACCOUNT_2)); OperationData<Transaction> buyTransactionDataForAccount1 = prepareOperationDataForTransaction(prepareTransaction(BUY_TRANSACTION, ACCOUNT_1)); logic.executeOperation(sellTransactionDataForAccount1); logic.executeOperation(sellTransactionDataForAccount2); logic.executeOperation(buyTransactionDataForAccount1); logger.info("executing other logic ..."); } private OperationData<Transaction> prepareOperationDataForTransaction(Transaction transaction) { OperationData<Transaction> transactionData = new TransactionData("complicated "+transaction.getTransactionType()+" operation on account"); transactionData.setValueObject(transaction); return transactionData; } private Transaction prepareTransaction(String transactionType, long accountNumber) { Transaction transaction = new Transaction(); transaction.setAccountNumber(accountNumber); transaction.setTransactionType(transactionType); return transaction; }
Possible output is:
22:36:29.954 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 22:36:29.960 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... 22:36:29.968 [operationExecutor-3] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [203002322] 22:36:29.968 [operationExecutor-2] DEBUG pl.mariusz.marciniak.locking.lockers.TransactionLocker - acquiring lock on [125593085] 22:36:29.968 [operationExecutor-3] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Buy operation on account number: 203002322 22:36:29.968 [operationExecutor-2] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Sell operation on account number: 125593085 22:36:29.980 [operationExecutor-1] INFO pl.mariusz.marciniak.locking.aop.ResourceLockAspect - Cannot perform operation complicated Sell operation on account. Locked resources 203002322 22:36:30.468 [operationExecutor-3] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Buy operation on account number: 203002322 22:36:30.469 [operationExecutor-2] DEBUG pl.mariusz.marciniak.operations.logic.TransactionProcessingLogic - processing Sell operation on account number: 125593085 ...Sell operation at line 7 was blocked as expected.
28 April, 2013
Operation Logic Executor - Combine fake locking with asynchronous execution
Combining both solutions is trivial since I used aspects for them. I need to add
I’ve used executor with single thread so the result is as desired. The most important thing is that asynchronous execution takes precedence before locking. That is very good, in other way resources will be locked, method invoked asynchronously will immediately returned and resources released before finishing asynchronous execution. So that’s REALLY great. Nice. Yeah, but why?
What exactly is defining order of different aspects invocation at the same join point? Well Spring documentation states:
@ResourceLock
to my asynchronous method.
@Component public class StringProcessingLogic implements OperationLogic<string> { private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); @ResourceLock @Async("singleOperationExecutor") public Future<boolean> executeOperation(OperationData<string> data) throws OperationFailedException { boolean result = false; try { long startTime = System.currentTimeMillis(); do { logger.info("processing " + data); Thread.sleep(500); } while (startTime + 10000 > System.currentTimeMillis()); result = true; } catch (InterruptedException e) { logger.error(e); } return new AsyncResult<boolean>(result); } }Let's check the result
20:37:10.603 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 20:37:10.629 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... 20:37:10.635 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on really long calculations::calculation data 20:37:10.635 [singleOperationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data ... 20:37:20.137 [singleOperationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 20:37:20.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on really long calculations::calculation data 20:37:20.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on transformation::transformed data 20:37:20.638 [singleOperationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data ... 20:37:30.138 [singleOperationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 20:37:30.638 [singleOperationExecutor-1] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on transformation::transformed data
I’ve used executor with single thread so the result is as desired. The most important thing is that asynchronous execution takes precedence before locking. That is very good, in other way resources will be locked, method invoked asynchronously will immediately returned and resources released before finishing asynchronous execution. So that’s REALLY great. Nice. Yeah, but why?
What exactly is defining order of different aspects invocation at the same join point? Well Spring documentation states:
When two pieces of advice defined in different aspects both need to run at the same join point, unless you specify otherwise the order of execution is undefined. You can control the order of execution by specifying precedence. This is done in the normal Spring way by either implementing theFor test purpose I will change my locking annotation order to be executed beforeorg.springframework.core.Ordered
interface in the aspect class or annotating it with theOrder
annotation. Given two aspects, the aspect returning the lower value fromOrdered.getValue()
(or the annotation value) has the higher precedence.
@Async
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Order(Ordered.HIGHEST_PRECEDENCE) public @interface ResourceLock { }In fact setting
@ResourceLock
to have the highest precedence (@Order
equal the lowest integer), doesn’t change application’s behaviour. This is because AsyncAnnotationBeanPostProcessor
has set beforeExistingAdvisors
property to true
and always adds @Async
advistor before others. According to Spring creators:
@Async
always needs to be the first Advisor in the chain in order to provide meaningful around-invocation semantics
It means that Spring provides correct ordering and I don't need to change anything in my code.
Operation Logic Executor - Spring Aspects
I want to base my locking mechanism on Spring Aspects. So first lets shortly introduce simple example of using AOP. This aspect will be executed for all methods annotated with
ResourceLock
annotation.@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface ResourceLock { }Next preparation step is to add spring-aspects artifact to dependencies from previous post.
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>3.2.2.RELEASE</version> </dependency>Now I can implement aspect.
@Aspect @Component public class ResourceLockAspect { private Logger logger = LogManager.getLogger(ResourceLockAspect.class); @Around("@annotation(pl.mariusz.marciniak.locking.annotations.ResourceLock)") private Object execute(ProceedingJoinPoint pjp) throws Throwable { logger.debug("acquiring lock on "+ pjp.getArgs()[0]); pjp.proceed(); logger.debug("releasing lock on "+ pjp.getArgs()[0]); return null; } }Around advice type means that advice surrounds method invocation. I will need that to lock resources and then release them after method execution is finished. As defined before pointcut expression will invoke advice for each method annotated with
ResourceLock
. So lets define bean with such a method.
@Component public class ObjectUsingFakeLocking { private final Logger logger = LogManager.getLogger(ObjectUsingFakeLocking.class); public void executeNormalMethod(String parameter) { logger.info("executeNormalMethod - " + parameter); } @ResourceLock public void executeMethodLockingResources(String parameter) { logger.info("executeMethodLockingResources - " + parameter); } }Execution of both
ObjectUsingFakeLocking
methods
private void testFakeLocking() { ObjectUsingFakeLocking obj = appContext.getBean(ObjectUsingFakeLocking.class); String param = "parameter"; obj.executeNormalMethod(param); obj.executeMethodLockingResources(param); }prints
19:40:34.192 [main] INFO pl.mariusz.marciniak.locking.ObjectUsingFakeLocking - executeNormalMethod - parameter 19:40:34.198 [main] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - acquiring lock on parameter 19:40:34.198 [main] INFO pl.mariusz.marciniak.locking.ObjectUsingFakeLocking - executeMethodLockingResources - parameter 19:40:34.198 [main] DEBUG pl.mariusz.marciniak.locking.aop.ResourceLockAspect - releasing lock on parameterOkay, so my aspects work as desired. Next step is to combine it with asynchronous execution.
18 April, 2013
Operation Logic Executor - Asynchronous execution
So let start with something really easy. I will add asynchronous execution to
Current declaration of this method doesn’t return any value, but throws
Okay I added core, context and aspects Spring jars required to use
OperationLogic.executeOperation
method.Current declaration of this method doesn’t return any value, but throws
OperationFailedException
. It means that I cannot check what caused the failure, when operation was
executed asynchronously. That’s why I chose to change executeOperation
method
to:public interface OperationLogic<T> { Future<Boolean> executeOperation(OperationData<T> data) throws OperationFailedException; }Now I can check thrown exception using Future object. I also added generics to avoid casting. Before implementing first sample class of
OperationLogic
, I need to add some dependencies to my Maven project
<dependency> <groupid>org.springframework</groupid> <artifactid>spring-core</artifactid> <version>3.2.2.RELEASE</version> <type>jar</type> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>3.2.2.RELEASE</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-aspects</artifactid> <version>3.2.2.RELEASE</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-core</artifactid> <version>2.0-beta4</version> <type>jar</type> </dependency>
Okay I added core, context and aspects Spring jars required to use
@Async
and additionally log4j library for logging.
@Component public class StringProcessingLogic implements OperationLogic<String> { private final Logger logger = LogManager.getLogger(StringProcessingLogic.class); public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException { boolean result = false; try { long startTime = System.currentTimeMillis(); do { logger.info("processing " + data); Thread.sleep(500); } while (startTime + 10000 > System.currentTimeMillis()); result = true; } catch (InterruptedException e) { logger.error(e); } return new AsyncResult<Boolean>(result); } }Now I only need to define packages which Spring should scan for beans
<context:component-scan base-package="pl.mariusz.marciniak.async" />and can test current solution. Here is code that I executed:
private void testAsyncString() throws OperationFailedException { logger.info("starting asynchronous test"); OperationLogic<String> logic = (OperationLogic<String>)appContext.getBean("stringProcessingLogic",OperationLogic.class); OperationData<String> calculationData = new OperationData<String>("really long calculations"); calculationData.setValueObject("calculation data"); logic.executeOperation(calculationData); OperationData<String> transformationData = new OperationData<String>("transformation"); transformationData.setValueObject("transformed data"); logic.executeOperation(transformationData); logger.info("executing other logic ..."); }If
OperationData.toString
method is overridden, you can get output similar to this one:
23:21:28.773 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 23:21:28.775 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:21:29.275 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data ... 23:21:38.275 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:21:38.777 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:21:39.277 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data ... 23:21:48.277 [main] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:21:48.777 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ...What we have here is standard synchronous execution, to make it asynchronous I need to create
ThreadPoolTaskExecutor
. It can be easily achieved with Spring, by adding definition of ThreadPoolTaskExecutor
into configuration file and attach this pool to be used by @Async
annotation
<task:executor id="operationExecutor" pool-size="5"/> <task:annotation-driven executor="operationExecutor" />Then I annotate
StringProcessingLogic.executeOperation
@Async public Future<Boolean> executeOperation(OperationData<String> data) throws OperationFailedException { ... }and run my test one more time
23:46:26.887 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - starting asynchronous test 23:46:26.892 [main] INFO pl.mariusz.marciniak.async.AsyncMainTest - executing other logic ... 23:46:26.892 [operationExecutor-2] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data 23:46:26.892 [operationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:46:27.392 [operationExecutor-1] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing really long calculations::calculation data 23:46:27.392 [operationExecutor-2] INFO pl.mariusz.marciniak.async.StringProcessingLogic - processing transformation::transformed data ...
As we can
see
executeOperation
methods returned immediately, that is why we
have “executing other logic…” right after “starting asynchronous test”. Also
transformation and calculation operations work in parallel and are not executed
by main thread anymore.
What if for
some operation I want asynchronous
execution, but not concurrent. Of course I can define pool with only one thread. So lets add another executor pool
<task:executor id="singleOperationExecutor" pool-size="1"/>Almost done. The problem is that
@Async
is still using my default annotations' executor. So I need to define executor explicitly in my not concurrent operation
@Async("singleOperationExecutor")
17 April, 2013
Operation Logic Executor - Assignment Description
"Operation Logic Executor" will be series of posts related to task, for which I need to prepare proof of concept.
Assignment Details
System contains various operations defined in classes implementing
Assignment Details
System contains various operations defined in classes implementing
OperationLogic
interface. According to provided data, it chooses correct operation and executes it. Currently all executions are synchronous. I need to add asynchronous execution and configurable locking mechanism, using Spring Framework. Declaration of OperationLogic
can be changed.public interface OperationLogic { void executeOperation(OperationData data) throws OperationFailedException; }Lets start...
Subscribe to:
Posts (Atom)