Dynamic Polling Container Templates using TriggerOperationHandler
Author | XAP Version | Last Updated | Reference | Download |
---|---|---|---|---|
Shravan (Sean) Kumar | 9.0.0 |
Overview
Polling Container is one of the most powerful and commonly used feature of GigaSpaces when processing data. To recap, Polling Containers perform a polling receive operation against the space. If a receive operation succeeds (a value is returned from the receive operation), the Data Event Listener is invoked with the event. The receive operation is performed using a static template. For most use cases a static template or a static SQL query (parameters are constant) is sufficient. Examples include: Receive any Order that is marked as “UN_PROCESSED”, receive any Order where customer name is like “VIPCustomer”, etc.
There are always use cases where you need dynamic templates.
Some examples:
- Processing messages that are older than an hour
- Processing an order only after all the items in an order are in the space
- Processing messages in a certain order
Each of these examples need a query that lacks all parameters at configuration time. TriggerOperationsHandler
helps achieve this behavior.
The Polling Container shows where TriggerOperationsHandler
fits into the Polling Container Life Cycle. Polling Container invokes the TriggerOperationsHandler.triggerReceive()
method before invoking the ReceiveHandler
which does the actual take and this is the perfect extension point where you can customize or modify the template.
Example
Below is an example that shows how you can use TriggerOperationsHandler
to process the Message with highest priority (assuming id is the priority) across the cluster and process them in same partition where the Message
object resides. It is based on helloworld example which is included with GigaSpaces XAP. Using a default Polling container template this will not be possible, but using a custom TriggerOperationHandler
you can achieve this.
MyTrigger
implementation is shown below,
public class MyTrigger implements TriggerOperationHandler {
private GigaSpace clusteredGigaSpace;
@Override
public Object triggerReceive(Object t, GigaSpace gigaSpace,
long receiveTimeout) throws DataAccessException {
// Make the thread wait for new data with a blocking read with timeout.
// Otherwise trigger operation handler will keep getting invoked in a
// tight loop
Message template = new Message();
template.setInfo("Hello ");
Message newMsg = gigaSpace.read((Message) template, 60000);
if (newMsg != null) {
SQLQuery<Message> query = new SQLQuery<Message>(Message.class,
"processed = false ORDER BY id DESC");
Message localObject = (Message) gigaSpace.read(query);
// If there is an object matching the template, validate if this is
// right priority
if (localObject != null) {
Message clusteredObject = clusteredGigaSpace.read(query);
if (clusteredObject != null
&& localObject.getId().equals(clusteredObject.getId())) {
return localObject;
}
}
}
return null;
}
@Override
public boolean isUseTriggerAsTemplate() {
return true;
}
public GigaSpace getClusteredGigaSpace() {
return clusteredGigaSpace;
}
public void setClusteredGigaSpace(GigaSpace clusteredGigaSpace) {
this.clusteredGigaSpace = clusteredGigaSpace;
}
}
MyTrigger
runs a cluster wide query and will need clustered proxy which is injected from the pu.xml. Another useful feature of TriggerOperationHandler
is ability to pass the template that the receive operation handler uses for performing the take. As you can see above the isUseTriggerAsTemplate
returns a boolean flag to indicate that the receive operation handler should use the template returned by MyTrigger
to perform the take.
pu.xml snippet below shows how MyTrigger is configured on the polling container,
<os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
<os-core:giga-space id="clusteredGigaSpace" clustered="true" space="space" tx-manager="transactionManager"/>
<!--
The processor bean
-->
<bean id="helloProcessor" class="org.openspaces.example.helloworld.processor.Processor"/>
<os-events:polling-container id="helloProcessorPollingEventContainer"
giga-space="gigaSpace">
<os-events:tx-support tx-manager="transactionManager"/>
<os-events:trigger-operation-handler>
<bean class="org.openspaces.example.helloworld.common.MyTrigger">
<property name="clusteredGigaSpace" ref="clusteredGigaSpace"/>
</bean>
</os-events:trigger-operation-handler>
<os-core:template>
<bean class="org.openspaces.example.helloworld.common.Message">
<property name="info" value="Hello "/>
</bean>
</os-core:template>
<os-events:listener>
<os-events:annotation-adapter>
<os-events:delegate ref="helloProcessor"/>
</os-events:annotation-adapter>
</os-events:listener>
</os-events:polling-container>
Notice the clustered proxy being passed to MyTrigger as a property.
Getting the project
Example project is held on github in the best practices project. This is an umbrella repository; the specific project is in the helloTriggerHandler directory under the root.
You can run this example just as how you would run helloworld example using the included ant build scripts. Be sure to use a cluster with at least 2 partitions when testing this.