14 March 2024
Event Exception Handler
GigaSpaces provides a mechanism allowing to hook into how exception raised by event listeners are handled, specifically when the event listeners are executed under the context of a transaction.
An event exception handler should implement the following interface:
public interface EventExceptionHandler<T> {
/**
* A callback when a successful execution of a listener.
*
* @param data The actual data object of the event
* @param gigaSpace A GigaSpace instance that can be used to perform additional operations against the
* space
* @param txStatus An optional transaction status allowing to rollback a transaction programmatically
* @param source Optional additional data or the actual source event data object (where relevant)
*/
void onSuccess(T data, GigaSpace gigaSpace,
TransactionStatus txStatus, Object source) throws RuntimeException;
/**
* A callback to handle exception in an event container. The handler can either handle the exception
* or propagate it.
*
* <p>If the event container is transactional, then propagating the exception will cause the transaction to
* rollback, which handling it will cause the transaction to commit.
*
* <p>The TransactionStatus can also be used to control if the transaction
* should be rolled back without throwing an exception.
*
* @param exception The listener thrown exception
* @param data The actual data object of the event
* @param gigaSpace A GigaSpace instance that can be used to perform additional operations against the
* space
* @param txStatus An optional transaction status allowing to rollback a transaction programmatically
* @param source Optional additional data or the actual source event data object (where relevant)
*/
void onException(ListenerExecutionFailedException exception, T data,
GigaSpace gigaSpace, TransactionStatus txStatus, Object source) throws RuntimeException;
}
If we take the following simple implementation of the event listener interface:
public class SimpleEventExceptionHandler implements EventExceptionHandler {
public void onSuccess(T data, GigaSpace gigaSpace,
TransactionStatus txStatus, Object source) throws RuntimeException {
// process success
}
public void onException(ListenerExecutionFailedException exception, Object data,
GigaSpace gigaSpace, TransactionStatus txStatus, Object source) throws RuntimeException {
// process failure
}
}
Here is how it can be configured:
@EventDriven @Polling
public class SimpleListener {
@ExceptionHandler
public EventExceptionHandler exceptionHandler() {
// can return this is SimpleListener implemented EventExceptionHandler
return new SimpleEventExceptionHandler();
}
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
Using the Event Exception Handler
One of the main use cases that an Exception Handler can be used for is to filter out poison messages. For example, with a polling container, if the event data (the message) can't be processed, and the polling container is transactional, it will continue to retry the message indefinitely. It will start a transaction, perform a take, try and process the message, throwing an exception in the process, and rolling back the transaction causing the take operation to be rolled back.
If the type of the exception is know to be unrecoverable, an exception handler can be registered that will check the exception type (the cause of the ListenerExecutionFailedException), detect it, and not re-throw an exception, but instead write that entry wrapped in a "Poison Message" entry back into the space creating a dead letter queue that can be processed later on.
A retry counter can also be handled by creating a generic interface, for example called RetryMessageEntry
, which certain messages will implement. That interface will allow to increment a counter and reset it. The counter field will be part of the entry (i.e. persisted in the space).
When an exception occurs, the retry counter will be incremented. If its under a specific threshold, the data object will be rewritten back to the space with the incremented counter (causing it to be taken again by the polling container). No exception will be raised in this case, as we want the transaction to be committed with the updated counter. Another option is to write a new entry with an updated counter, if there might have been side affects to the listener that the transaction should not commit.
If the threshold has been breached, the same poising message handling described above can be applied.
Its important to note that the exception handler onException
and onSuccess
operate under the existing on going transaction started by the polling container. Doing something outside of a transaction can be done by using a GigaSpace
instance that is not associated with a transaction manager.
Example
Here is an example where we create Purchase Orders that need to be processed. The NewOrderProcessor
will try to process the Purchase Order. If the processing fails it will retry two times. If it can't process the order a PoProcessException
is thrown that will be handled by the PoEventExceptionHandler
.
The PoEventExceptionHandler will update the state
of the PurchaseOrder to UNPROCESSABLE
and write it back into the Space. The order will not be seen by the NewOrderProcessor
again since its template specified the status to be NEW
.
public class Program {
private static Logger logger = LoggerFactory
.getLogger(Program.class);
public static void main(String[] args) throws InterruptedException {
GigaSpace space = new GigaSpaceConfigurer(new EmbeddedSpaceConfigurer(
"sandboxSpace")).gigaSpace();
// Register the Processor
registerListener(space);
Thread.sleep(1000);
// Create a Purchase Order
PurchaseOrder po = new PurchaseOrder();
//po.setNumber("12345");
po.setId(UUID.randomUUID());
po.setState(EPurchaseOrderState.NEW);
// Write it into the Space
space.write(po);
Thread.sleep(1000);
// Read all not processable PO's
SQLQuery<PurchaseOrder> query = new SQLQuery<PurchaseOrder>(
PurchaseOrder.class, "state = ?");
query.setParameter(1, EPurchaseOrderState.UNPROCESSABLE);
for (PurchaseOrder pu : space.readMultiple(query)) {
logger.debug("PurchaseOrder in Space "+ pu);
}
System.exit(1);
}
private static void registerListener(GigaSpace space) {
SimplePollingEventListenerContainer pollingEventListenerContainer = new SimplePollingContainerConfigurer(
space).eventListenerAnnotation(new NewOrderProcessor())
.pollingContainer();
pollingEventListenerContainer.start();
}
}
When you run the above example you will see the following output:
18:56:58.968 [GS-SimplePollingEventListenerContainer-1] DEBUG x.s.e.e.NewOrderProcessor - handling the exception for the 1 time
18:56:58.971 [GS-SimplePollingEventListenerContainer-1] DEBUG x.s.e.e.NewOrderProcessor - handling the exception for the 2 time
18:56:58.971 [GS-SimplePollingEventListenerContainer-1] DEBUG x.s.e.e.NewOrderProcessor - handling the exception for the 3 time
18:56:58.971 [GS-SimplePollingEventListenerContainer-1] DEBUG x.s.e.e.NewOrderProcessor - Max retry count reached throwing exception
18:56:58.972 [GS-SimplePollingEventListenerContainer-1] DEBUG x.s.e.e.PoEventExceptionHandler - Dealing with the exception, change the status to UPROCESSABLE and write it back into the space
18:56:59.968 [main] DEBUG x.s.e.e.Program - PurchaseOrder in Space PurchaseOrder [retryCounter=2, id=c47e2879-ca6a-4531-b073-3f5c09f658cd, number=null, state=UNPROCESSABLE]