Overview


The polling event container is an implementation of the polling consumer pattern which uses the space to receive events. It performs polling receive operations against the space. If a receive operation succeeds (a value is returned from the receive operation), the Data Event Listener is invoked with the event. A polling event operation is mainly used when simulating Queue semantics or when using the master-worker design pattern.

Life Cycle Events

The polling container life cycle events described below. You may implement each of of these to perform the desired activity.

dynamic_polling_container_life_cycle.jpg

Configuration

Here is a simple example of polling event container configuration:

<!-- Enable scan for OpenSpaces and Spring components -->
<context:component-scan base-package="com.mycompany"/>

<!-- Enable support for @Polling annotation -->
<os-events:annotation-support />

<os-core:embedded-space id="space" space-name="mySpace"/>

<os-core:giga-space id="gigaSpace" space="space"/>
@EventDriven @Polling
public class SimpleListener {

    @EventTemplate
    Data unprocessedData() {
        Data template = new Data();
        template.setProcessed(false);
        return template;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-core:embedded-space id="space" space-name="mySpace"/>
<os-core:giga-space id="gigaSpace" space="space"/>
<bean id="simpleListener" class="SimpleListener" />
<os-events:polling-container id="eventContainer" giga-space="gigaSpace">

    <os-core:template>
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </os-core:template>

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:polling-container>
<bean id="space" class="org.openspaces.core.space.EmbeddedSpaceFactoryBean">
    <property name="name" value="space" />
</bean>

<bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean">
    <property name="space" ref="space" />
</bean>

<bean id="simpleListener" class="SimpleListener" />

<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="template">
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>
GigaSpace gigaSpace = // either create the GigaSpace or get it by injection

SimplePollingEventListenerContainer pollingEventListenerContainer = new SimplePollingContainerConfigurer(gigaSpace)
                .template(new Data(false))
                .eventListenerAnnotation(new Object() {
                    @SpaceDataEvent
                    public void eventHappened() {
                        eventCalled.set(true);
                    }
                }).pollingContainer();

// start the notification
pollingEventListenerContainer.start();
......
// when needed dispose of the notification container
pollingEventListenerContainer.destroy();
Restriction

EventDriven , @Polling , @Notify cannot be placed on interface classes. You should place these on the implementation class.

The example above performs single take operations (see below) using the provided template (a Data object with its processed flag set to false). If the take operation succeeds (a value is returned), the SimpleListener is invoked. The operations are performed on the configured GigaSpace bean (in this case, if working in a clustered topology, it is performed directly on the cluster member).

Primary/Backup

The polling event container, by default, performs receive operations only when the relevant space it is working against is in primary mode. When the space is in backup mode, no receive operations are performed. If the space moves from backup mode to primary mode, the receive operations are started.

This mostly applies when working with an embedded space directly with a cluster member. When working with a clustered space (performing operations against the whole cluster), the mode of the space is always primary.

FIFO Grouping

The FIFO Grouping is designed to allow efficient processing of events with partial ordering constraints. Instead of maintaining a FIFO queue per class type, it lets you have a higher level of granularity by having FIFO queue maintained according to a specific value of a specific property.

See also:

For more details see FIFO grouping.

Static Template Definition

When performing receive operations, a template is defined, creating a virtualized subset of data within the space that matches it. GigaSpaces supports templates based on the actual domain model (with null values denoting wildcards), which are shown in the examples. GigaSpaces allows the use of SQLQuery in order to query the space, which can be easily used with the event container as the template. Here is an example of how it can be defined:

@EventDriven @Polling
public class SimpleListener {

    @EventTemplate
    SQLQuery<Data> unprocessedData() {
        SQLQuery<Data> template = new SQLQuery<Data>(Data.class, "processed = true");
        return template;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-events:polling-container id="eventContainer" giga-space="gigaSpace">

    <os-core:sql-query where="processed = true" class="org.openspaces.example.data.common.Data"/>

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>

</os-events:polling-container>
<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="template">
        <bean class="com.j_spaces.core.client.SQLQuery">
            <constructor index="0" value="org.openspaces.example.data.common.Data" />
            <constructor index="0" value="processed = true" />
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>
Restriction

A polling container or notify container could have only one template. If you need multiple event handlers you will need to create another polling container or notify container. If you use multiple polling containers make sure the different templates does not overlap each other.

Multiple Event Handlers

It is possible to define multiple event handlers for a polling container. If you have a superclass that has subclasses, and you want to define event handlers for each subclass, you can define the event template for the superclass and a @SpaceDataEvent for each subclass.

Here is an Example where HostInfo, MachineInfo and LdapInfo are subclasses of the MonitorInfo class:

@EventDriven
@Polling
public class PollingExample {

	@EventTemplate
	public SQLQuery<MonitorInfo> dataTemplate() {
		return new SQLQuery<MonitorInfo>(MonitorInfo.class, "");
	}

	@SpaceDataEvent
	public MachineInfo eventListener(final MachineInfo event) {
		// ..........
		return null;
	}

	@SpaceDataEvent
	public MachineInfo eventListener(final HostInfo event) {
		// ..........
		return null;
	}

	@SpaceDataEvent
	public MachineInfo eventListener(final LdapInfo event) {
		// ..........
		return null;
	}
}

Multiple Values Template

You may use a SQLQuery having IN operator with multiple values to register a Template with multiple values. This can be a simple alternative avoiding using multiple polling containers. See below example:

import com.gigaspaces.annotation.pojo.SpaceId;
import com.gigaspaces.annotation.pojo.SpaceIndex;

public class MyData {
    String id;
    String key;

    @SpaceId(autoGenerate=false)
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

    @SpaceIndex
    public String getKey() {
        return key;
    }
    public void setKey(String key) {
        this.key = key;
    }
    @Override
    public String toString() {
        return "MyData [id=" + id + ", key=" + key + "]";
    }
}
SimplePollingEventListenerContainer pollingEventListenerContainer =
    new SimplePollingContainerConfigurer(space)
        .template(query)
        .eventListenerAnnotation(new Object() {
    @SpaceDataEvent
    public void eventHappened(MyData data) {
        System.out.println("Polling Container Got matching event! - " +data);
    }
}).pollingContainer();

Dynamic Template Definition

When performing polling receive operations, a dynamic template can be used. A method providing a dynamic template is called before each receive operation, and can return a different object in each call. The event template object has the same syntax rules as with @EventTemplate.

@EventDriven @Polling
public class SimpleListener {

    @DynamicEventTemplate
    SQLQuery<Data> unprocessedExpiredData() {
        long expired = System.currentTimeMillis() - 60000;
        SQLQuery<Data> dynamicTemplate =
          new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired);
        return dynamicTemplate;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-events:polling-container id="eventContainer" giga-space="gigaSpace">

    <os-events:dynamic-template ref="dynamicTemplate" />

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>

</os-events:polling-container>
<bean id="dynamicTemplate" class="ExpiredDataTemplateProvider"/>
public class ExpiredDataTemplateProvider implements DynamicEventTemplateProvider {

    @Override
    public Object getDynamicTemplate() {
        long expired = System.currentTimeMillis() - 60000;
        SQLQuery<Data> dynamicTemplate =
          new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired);
        return dynamicTemplate;
    }
}
<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />
    <property name="dynamicTemplate" ref="dynamicTemplate" />

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>
<bean id="dynamicTemplate" class="ExpiredDataTemplateProvider"/>
public class ExpiredDataTemplateProvider implements DynamicEventTemplateProvider {

    @Override
    public Object getDynamicTemplate() {
        long expired = System.currentTimeMillis() - 60000;
        SQLQuery<Data> dynamicTemplate =
          new SQLQuery<Data>(Data.class, "processed = true AND timestamp < " + expired);
        return dynamicTemplate;
    }
}

Only polling containers support dynamic templates. Notify containers do not support dynamic templates.

Receive Operation Handler

The polling receive container performs receive operations. The actual implementation of the receive operation is abstracted using the following interface:

public interface ReceiveOperationHandler {
    /**
     * Performs the actual receive operation. Return values allowed are single object or an array of
     * objects.
     *
     * @param template
     *            The template to use for the receive operation.
     * @param gigaSpace
     *            The GigaSpace interface to perform the receive operations with
     * @param receiveTimeout
     *            Receive timeout value
     * @return The receive result. <code>null</code> indicating no receive occured. Single object
     *         or an array of objects indicating the receive operation result.
     * @throws DataAccessException
     */
    Object receive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException;
}

XAP comes with several built-in receive operation-handler implementations:

Receive Operation Handler Description
SingleTakeReceiveOperationHandler Performs a single blocking take operation with the receive timeout.
SingleReadReceiveOperationHandler Performs a single blocking read operation with the receive timeout.
ExclusiveReadReceiveOperationHandler Performs a single read operation under an exclusive read lock (similar to “select for update” in databases) with the receive timeout. Exclusive read lock mimics the take operation without actually taking the Entry from the space. This receive operation handler must be used within a transaction.
MultiTakeReceiveOperationHandler First tries to perform takeMultiple (using a configured max Entries). If no values are returned, performs a blocking take operation with the receive timeout.
MultiReadReceiveOperationHandler First tries to perform readMultiple (using a configured max Entries). If no values are returned, performs a blocking read operation with the receive timeout.
MultiExclusiveReadReceiveOperationHandler First tries to perform readMultiple (using a configured max Entries). If no values are returned, performs a blocking read operation with the receive timeout. Both read operations are performed under an exclusive read lock (similar to “select for update” in databases) which mimics a take operation without actually taking the Entry from the space. Note, this receive operation handler must be used within a transaction.

When using the ExclusiveReadReceiveOperationHandler or even the SingleReadReceiveOperationHandler, it is important to remember that the actual event still remains in the space. If the data event is not taken from the space, or one of its properties changes in order not to match the container template, the same data event is read again.

Here is an example of how the receive operation handler can be configured with MultiTakeReceiveOperationHandler:

@EventDriven @Polling
public class SimpleListener {

    @ReceiveHandler
    ReceiveOperationHandler receiveHandler() {
        MultiTakeReceiveOperationHandler receiveHandler = new MultiTakeReceiveOperationHandler();
        receiveHandler.setMaxEntries(100);
        return receiveHandler;
    }

    @EventTemplate
    Data unprocessedData() {
        Data template = new Data();
        template.setProcessed(false);
        return template;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-core:embedded-space id="space" space-name="mySpace"/>

<os-core:giga-space id="gigaSpace" space="space"/>

<bean id="simpleListener" class="SimpleListener" />

<os-events:polling-container id="eventContainer" giga-space="gigaSpace">
    <os-events:receive-operation-handler>
        <bean class="org.openspaces.events.polling.receive.MultiTakeReceiveOperationHandler" />
    </os-events:receive-operation-handler>

    <os-core:template>
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </os-core:template>

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:polling-container>
<bean id="space" class="org.openspaces.core.space.EmbeddedSpaceFactoryBean">
    <property name="name" value="space" />
</bean>

<bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean">
    <property name="space" ref="space" />
</bean>

<bean id="simpleListener" class="SimpleListener" />

<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="receiveOperationHandler">
        <bean class="org.openspaces.events.polling.receive.MultiTakeReceiveOperationHandler" />
    </property>

    <property name="template">
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>

Non-Blocking Receive Handler

When working with a partitioned cluster and configuring the remote polling container to work against the whole cluster, blocking operations (take with a timeout>0) are not allowed (when the routing field is not set on the template or SQLQuery). The default receive operation handlers support performing the receive operation in a non-blocking manner, by sleeping between non-blocking operations. For example, the SingleTakeReceiveOperationHandler performs a non-blocking take operation against the space and then sleeps for a configurable amount of time. A classic scenario where the Non-Blocking mode would be used is the Master-Worker Pattern.

Step 1 - Master sending requests to process by the workers implemented using the polling container:

master_worker_rr1.jpg

Step 2 - Workers generating results which are consumed by the Master:

master_worker_rr2.jpg

Here is an example of how a Non-Blocking mode can be configured:

@EventDriven @Polling (receiveTimeout=10000)
public class SimpleListener {

    @ReceiveHandler
    ReceiveOperationHandler receiveHandler() {
        SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
        receiveHandler.setNonBlocking(true);
        receiveHandler.setNonBlockingFactor(10);
        return receiveHandler;
    }

    @EventTemplate
    Data unprocessedData() {
        Data template = new Data();
        template.setProcessed(false);
        return template;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-core:embedded-space id="space" space-name="mySpace"/>

<os-core:giga-space id="gigaSpace" space="space"/>

<bean id="simpleListener" class="SimpleListener" />

<os-events:polling-container id="eventContainer" giga-space="gigaSpace" receive-timeout="10000">
    <os-events:receive-operation-handler>
        <bean class="org.openspaces.events.polling.receive.SingleTakeReceiveOperationHandler">
            <property name="nonBlocking" value="true" />
            <property name="nonBlockingFactor" value="10" />
        </bean>
    </os-events:receive-operation-handler>

    <os-core:template>
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </os-core:template>

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:polling-container>
<bean id="space" class="org.openspaces.core.space.EmbeddedSpaceFactoryBean">
    <property name="name" value="space" />
</bean>

<bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean">
    <property name="space" ref="space" />
</bean>

<bean id="simpleListener" class="SimpleListener" />

<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="receiveTimeout" value="10000" />

    <property name="receiveOperationHandler">
        <bean class="org.openspaces.events.polling.receive.SingleTakeReceiveOperationHandler">
            <property name="nonBlocking" value="true" />
            <property name="nonBlockingFactor" value="10" />
        </bean>
    </property>

    <property name="template">
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>

The above example uses a receive timeout of 10 seconds (10000 milliseconds). The SingleTakeReceiveOperationHandler is configured to be non-blocking with a non-blocking factor of 10. This means that the receive handler performs 10 non-blocking takes within 10 seconds and sleeps the rest of the time (~1 second each time).

Batch Operations and passArrayAsIs

Processing data in batches may improve the processing throughput performance. Instead of consuming object one at a time from the space and processing it, you may consume a batch with multiple objects and process these in one transaction. This may improve the overall throughput rate, but may impact the latency of the individual object processing time.

See below a simple benchmark results comparing the different options:

poll_bench.jpg

See full benchmark description on the How to Implement my Processor? - The Polling Container Benchmark GigaSpaces blog site.

You may use batching via the MultiTakeReceiveOperationHandler. The MultiTakeReceiveOperationHandler.setMaxEntries(integer) allows you to set the maximum amount of objects to be consumed with each polling event. If the space does not have sufficient number of matching objects during the polling point in time, the event listener method will be called with the existing number of matching objects (will be smaller than the MaxEntries value. There will be no delay in such a case and the polling container will not wait until there will be exact amount of matching objects to consume as specified via the MaxEntries.

Certain receive operation handlers might return an array as a result of the receive operation. A prime example is the MultiTakeReceiveOperationHandler, which might return an array as a result of a takeMultiple operation called by the polling container. By default, the polling container serializes the execution of the array into invocation of the event listener method for each element in the array. If you want the event to operate on the whole array (receive the array as a parameter into the event listener method), the passArrayAsIs annotation should be set to true.

Here is an example for batch processing using the passArrayAsIs - with this example the polling container will consume a batch of objects using takeMultiple, modify these and write these back into the space in one operation using writeMultiple:

@EventDriven
@Polling(passArrayAsIs = true)
public class SimpleBatchListener {

    @ReceiveHandler
    ReceiveOperationHandler receiveHandler() {
        MultiTakeReceiveOperationHandler receiveHandler = new MultiTakeReceiveOperationHandler();
        receiveHandler.setMaxEntries(100);
        return receiveHandler;
    }

    @EventTemplate
    Data unprocessedData() {
        Data template = new Data();
        template.setProcessed(false);
        return template;
    }

    @SpaceDataEvent
    public Data[] eventListener(Data events[]) {
        //process Data within a loop
        for (int i = 0; i < events.length; i++) {
           events[i].setProcessed(true);
        }
        return events;
    }
}
<bean id="space" class="org.openspaces.core.space.EmbeddedSpaceFactoryBean">
    <property name="name" value="space" />
</bean>

<bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean">
    <property name="space" ref="space" />
</bean>
<bean id="simpleBatchListener" class="SimpleBatchListener" />

<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="passArrayAsIs" value="true" />

    <property name="receiveOperationHandler">
        <bean class="org.openspaces.events.polling.receive.MultiTakeReceiveOperationHandler">
            <property name="maxEntries" value="100" />
        </bean>
    </property>

    <property name="template">
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleBatchListener" />
        </bean>
    </property>
</bean>

Free Polling Container Resources

To free the resources used by the polling container make sure you close it properly. A good life cycle event to place the destroy() call would be within the @PreDestroy or DisposableBean.destroy() method.

Polling Container Lifecycle

The Polling Event Listener container supports several life cycle methods. These allow you to create, start , stop and destroy the listener programmatically. There is also the setActiveWhenPrimary lifecycle mode that will bind it to the Space mode when set to TRUE, starting it when the Space mode is PRIMARY and stopping otherwise.

You can get the exact status of the Polling Event Listener container using the isActive() and isRunning() method. Here is a simple example illustrating the different Polling Event Listener container life cycle methods:

import java.util.Calendar;

import org.openspaces.core.GigaSpace;
import org.openspaces.core.GigaSpaceConfigurer;
import org.openspaces.core.space.EmbeddedSpaceConfigurer;
import org.openspaces.events.adapter.SpaceDataEvent;
import org.openspaces.events.polling.SimplePollingContainerConfigurer;
import org.openspaces.events.polling.SimplePollingEventListenerContainer;

public class PollingContainerLifeCycleMain {
    static SimplePollingEventListenerContainer pollingEventListenerContainer;
    static GigaSpace gigaSpace;

    public static void main(String[] args) throws Exception {

        gigaSpace = new GigaSpaceConfigurer(new EmbeddedSpaceConfigurer("mySpace")).gigaSpace();

        // Write data to the space
        gigaSpace.write(new Data());
        say("wrote object to space");
        say("pollingContainer about to be created");

        // create a polling listener
        pollingEventListenerContainer = new SimplePollingContainerConfigurer(gigaSpace).template(new Data())
                .autoStart(false).eventListenerAnnotation(new Object() {
                    @SpaceDataEvent
                    public void eventHappened() {
                        say("event consumed");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }).pollingContainer();

        say("pollingContainer created");
        Thread.sleep(1000);
        say("pollingContainer about to be started");
        pollingEventListenerContainer.start();
        say("pollingContainer started");
        Thread.sleep(1000);

        say("pollingContainer about to be stopped");
        pollingEventListenerContainer.stop();
        say("pollingContainer stoped");
        Thread.sleep(1000);

        say("pollingContainer about to be restarted");
        pollingEventListenerContainer.start();
        say("pollingContainer started");
        Thread.sleep(1000);

        say("pollingContainer about to be destroyed");
        pollingEventListenerContainer.destroy();
        say("pollingContainer destroyed");
        System.exit(0);
    }

    static public void say(String mes) {
        Calendar d = Calendar.getInstance();

        int ms = Calendar.getInstance().get(Calendar.MILLISECOND);
        String t = d.getTime() + ":" + ms;

        if (pollingEventListenerContainer == null)
            System.out.println(t + " - " + " isActive:" + "false" + " isRunning:" + "false" + " " + mes);
        else
            System.out.println(t + " - " + " isActive:" + pollingEventListenerContainer.isActive() + " isRunning:"
                    + pollingEventListenerContainer.isRunning() + " " + mes);
    }
}
import com.gigaspaces.annotation.pojo.SpaceId;

public class Data {

    private String id;

    public Data() {
    }

    @SpaceId(autoGenerate = true)
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}

When running the above example,the following output will be display:

Sun Jul 31 13:24:24 CAT 2016:466 -  isActive:false isRunning:false wrote object to space
Sun Jul 31 13:24:24 CAT 2016:466 -  isActive:false isRunning:false pollingContainer about to be created
Sun Jul 31 13:24:24 CAT 2016:484 -  isActive:true isRunning:false pollingContainer created
Sun Jul 31 13:24:25 CAT 2016:484 -  isActive:true isRunning:false pollingContainer about to be started
Sun Jul 31 13:24:25 CAT 2016:484 -  isActive:true isRunning:true pollingContainer started
Sun Jul 31 13:24:25 CAT 2016:511 -  isActive:true isRunning:true event consumed
Sun Jul 31 13:24:26 CAT 2016:486 -  isActive:true isRunning:true pollingContainer about to be stopped
Sun Jul 31 13:24:26 CAT 2016:486 -  isActive:true isRunning:false pollingContainer stopped
Sun Jul 31 13:24:27 CAT 2016:486 -  isActive:true isRunning:false pollingContainer about to be restarted
Sun Jul 31 13:24:27 CAT 2016:486 -  isActive:true isRunning:true pollingContainer started
Sun Jul 31 13:24:28 CAT 2016:487 -  isActive:true isRunning:true pollingContainer about to be destroyed
Sun Jul 31 13:24:28 CAT 2016:489 -  isActive:false isRunning:false pollingContainer destroyed

Trigger Receive Operation

When configuring the polling event container to perform its receive operation and event actions under a transaction, a transaction is started and rolled back for each unsuccessful receive operation, which results in a higher load on the space. The polling event container allows pluggable logic to be used in order to decide if the actual receive operation should be performed or not. This logic, called the trigger receive operation, is performed outside the receive transaction boundaries. The following interface is provided for custom implementation of this logic:

public interface TriggerOperationHandler {
    /**
     * Allows to perform a trigger receive operation which control if the active receive operation
     * will be performed in a polling event container. This feature is mainly used when having
     * polling event operations with transactions where the trigger receive operation is performed
     * outside of a transaction thus reducing the creation of transactions did not perform the
     * actual receive operation.
     *
     * <p>
     * If this operation returns a non <code>null</code> value, it means that the receive
     * operation should take place. If it returns a <code>null</code> value, no receive operation
     * will be attempted.
     *
     * @param template
     *            The template to use for the receive operation.
     * @param gigaSpace
     *            The GigaSpace interface to perform the receive operations with
     * @param receiveTimeout
     *            Receive timeout value
     * @throws DataAccessException
     *
     */
    Object triggerReceive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException;

    /**
     * Controls if the object returned from
     * {@link #triggerReceive(Object,org.openspaces.core.GigaSpace,long)} will be used as the
     * template for the receive operation by returning <code>true</code>. If <code>false</code>
     * is returned, the actual template configured in the polling event container will be used.
     */
    boolean isUseTriggerAsTemplate();
}

OpenSpaces comes with a built-in implementation of this interface, called ReadTriggerOperationHandler. It performs a single blocking read operation (using the provided receive timeout), thus “peeking” into the space for relevant event data. If the read operation returns a value, this means that there is higher probability that the receive operation will succeed, and the transaction won’t be started without a purpose. Here is how it can be configured:

@EventDriven @Polling @TransactionalEvent
public class SimpleListener {

    @TriggerHandler
    TriggerOperationHandler receiveHandler() {
        ReadTriggerOperationHandler triggerHandler = new ReadTriggerOperationHandler();
        return triggerHandler;
    }

    @EventTemplate
    Data unprocessedData() {
        Data template = new Data();
        template.setProcessed(false);
        return template;
    }

    @SpaceDataEvent
    public Data eventListener(Data event) {
        //process Data here
    }
}
<os-core:embedded-space id="space" space-name="mySpace"/>

<os-core:giga-space id="gigaSpace" space="space"/>

<os-core:distributed-tx-manager id="transactionManager" />

<bean id="simpleListener" class="SimpleListener" />

<os-events:polling-container id="eventContainer" giga-space="gigaSpace">
    <os-events:tx-support tx-manager="transactionManager"/>

    <os-events:trigger-operation-handler>
        <bean class="org.openspaces.events.polling.trigger.ReadTriggerOperationHandler" />
    </os-events:trigger-operation-handler>

    <os-core:template>
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </os-core:template>

    <os-events:listener>
        <os-events:annotation-adapter>
            <os-events:delegate ref="simpleListener"/>
        </os-events:annotation-adapter>
    </os-events:listener>
</os-events:polling-container>
<bean id="space" class="org.openspaces.core.space.EmbeddedSpaceFactoryBean">
    <property name="name" value="space" />
</bean>

<bean id="gigaSpace" class="org.openspaces.core.GigaSpaceFactoryBean">
    <property name="space" ref="space" />
</bean>

<bean id="transactionManager" class="org.openspaces.core.transaction.manager.DistributedJiniTransactionManager" />

<bean id="simpleListener" class="SimpleListener" />

<bean id="eventContainer" class="org.openspaces.events.polling.SimplePollingEventListenerContainer">
    <property name="transactionManager" ref="transactionManager" />

    <property name="gigaSpace" ref="gigaSpace" />

    <property name="triggerOperationHandler">
        <bean class="org.openspaces.events.polling.trigger.ReadTriggerOperationHandler" />
    </property>

    <property name="template">
        <bean class="org.openspaces.example.data.common.Data">
            <property name="processed" value="false"/>
        </bean>
    </property>

    <property name="eventListener">
        <bean class="org.openspaces.events.adapter.AnnotationEventListenerAdapter">
            <property name="delegate" ref="simpleListener" />
        </bean>
    </property>
</bean>
See also:

Learn more using about TriggerOperationsHandler using an example application on the Patterns wiki

Default Values of Polling Container Configuration Parameters

The default values for all of the polling container configuration parameters such as concurrent-consumers, active-when-primary, receive-timeout and others can be found in the JavaDoc (and sources) of the class SimplePollingEventListenerContainer and its super class, namely AbstractEventListenerContainer.

For example, concurrent-consumers default value is documented in the method SimplePollingEventListenerContainer.setConcurrentConsumers(int)

Notify verses Polling Container

The Polling container behavior is different than the Notify Container. Comparing the matching phase (that is somehow similar) that both conduct, is not enough.

Notify Container

The Notify Container is triggered without any feedback loop control - it may be called concurrently without any control by many threads. This may reduce the latency but generate scenarios where you overload the Space and client without any ability to throttle the activity that will increase the latency. It may also cause locking issues in case the Notify Container logic need to update the same data that will increase the latency. It also has issues with durability (may loose events on failure) - that can be handled with guaranteed notifications that impose some overhead and additional latency. It does support remote Spaces without major issues.

Polling Container

The Polling Container acts like a queue. If you have one concurrent consumer thread this may impact the overall latency. Still , you may control the concurrency , its always guaranteed and won’t generate locking when updating objects. It won’t support remote Spaces as you can’t perform blocking take with a null routing value against a remote partitioned Space . This means you should run in non-blocking mode. This may introduce additional latency. Reducing it (high frequency sampling rate) will impact CPU utilization and overall system overhead as the client will perform a non-blocking take operation in a round robin fashion against the different partitions. The more partitions you have and non-even distribution split of the data will result latency with the Polling Container invocation.

You may construct an array of Polling Containers - each with a specific routing that will allow you to use blocking take that will reduce the latency.

See also:

You can find an example here