JTA-XA Example

Author Product Version Reference Download
Shravan (Sean) Kumar 8.0.1 JTA-XA example


Integrating GigaSpaces products with an external JMS Server is demonstrated in this page. This example shows how the data grid's event container can process events and send JMS messages to a external JMS server, all under one distributed transaction.

Use of distributed transactions is done as a demonstration. Use this with caution, in production applications this can be expensive and have a performance penalty. Well known patterns like Idempotent Receiver are potential alternatives to distributed transactions.

This example has the following characteristics:

  • GigaSpaces updates and JMS message creation are done under transactions so as to avoid duplicate processing/data loss.
  • Apache ActiveMQ is used as a the JMS provider.
  • Atomikos is used as the JTA Transaction provider and uses the XA protocol.
  • Example is based on the GigaSpaces helloworld example included in the product package.
  • To demonstrate the XA transaction it rollbacks messages with 100 modulo. You will notice that these messages will never appear in the JMS queue and are rolled back on the GigaSpaces server.

 Source Code

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
        init-method="init" destroy-method="close">
        <property name="forceShutdown" value="false" />

    <!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300" />

    <bean id="transactionManager"
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
<!-- creates an activemq xa connection factory using the amq namespace -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL"><value>tcp://localhost:61616</value></property>

<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
    <property name="physicalName"><value>gigaspaces.helloworld.jms.exampleQueue</value></property>

<!-- Configure the JMS connector -->
<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="MY_QUEUE"/>
    <property name="xaConnectionFactory" ref="amqConnectionFactory" />
    <property name="localTransactionMode" value="false"></property>

<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="sessionTransacted" value="true" />
<!-- A bean representing a space (an IJSpace implementation). -->
<os-core:embedded-space id="space" name="processorSpace" />

<!-- A wrapper bean to the space to provide OpenSpaces simplified space
    API (built on top of IJSpace/JavaSpace). -->
<os-core:giga-space id="gigaSpace" space="space"
    tx-manager="transactionManager" />

<!-- A Polling Container bean that performs repeated take operations from
    the space of objects matching a defined template. (The take operations are
    by default blocking, which means a single take operation is waiting until
    a match is found) The template here instructs the polling container to take
    objects of type Message with their "info" attribute set to "Hello ". When
    a match is found, the object is taken and passed to a listener bean - here
    the listener is the previously defined Processor bean. This bean has the
    method processMessage(), which is invoked on the taken object, retuning a
    processed object. After the object is processed, it is written back to the
    space by the Polling Container. -->
<os-events:polling-container id="helloProcessorPollingEventContainer"
    <os-events:tx-support tx-manager="transactionManager" />
        <bean class="org.openspaces.example.helloworld.common.Message">
            <property name="info" value="Hello " />
            <os-events:delegate ref="helloProcessor" />

<!-- The processor bean -->
<bean id="helloProcessor" class="org.openspaces.example.helloworld.processor.Processor">
    <property name="jmsTemplate" ref="jmsTemplate" />
    <property name="queue" ref="queue"/>

Processor Bean and Message Bean definitions

package org.openspaces.example.helloworld.processor;

import java.io.IOException;
import java.util.logging.Logger;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;

import org.openspaces.events.adapter.SpaceDataEvent;
import org.openspaces.example.helloworld.common.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

 * The processor is passed interesting Objects from its associated
 * PollingContainer
 * <p>
 * The PollingContainer removes objects from the GigaSpace that match the
 * criteria specified for it.
 * <p>
 * Once the Processor receives each Object, it modifies its state and returns it
 * to the PollingContainer which writes them back to the GigaSpace
 * <p/>
 * <p>
 * The PollingContainer is configured in the puClosed This is the unit of packaging and deployment in the GigaSpaces Data Grid, and is essentially the main GigaSpaces service. The Processing Unit (PU) itself is typically deployed onto the Service Grid. When a Processing Unit is deployed, a Processing Unit instance is the actual runtime entity..xml file of this project
public class Processor implements InitializingBean {
    Logger logger = Logger.getLogger(this.getClass().getName());
    private JmsTemplate jmsTemplate;
    private Queue queue;

    private int msgCtr;
    private int rollbacks;

     * Process the given Message and return it to the caller. This method is
     * invoked using OpenSpaces Events when a matching event occurs.
     * @throws JMSException
     * @throws Exception
    public Message processMessage(Message msg) throws Exception {
        logger.info("Processor PROCESSING: " + msg + " MessageCtr: " + ++msgCtr);

        try {
        } catch (Exception e) {
            throw e;

        if (msg.getId() % 100 == 0) {
            logger.info("Rolling back: " + msg.getId() + " Rollback Counter: "
                    + ++rollbacks);
            throw new Exception("Rollback");
        return msg;

    private void myBusinessLogic(Message msg) {
        msg.setInfo(msg.getInfo() + "World !! Message id: " + msg.getId());

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;

    public void setQueue(Queue queue) {
        this.queue = queue;

    public Processor() {
        logger.info("Processor instantiated, waiting for messages feed...");

    public void afterPropertiesSet() throws Exception {
        logger.info("In the Processor Bean");

     * Send to the queue
    private void sendMessage(final Message msg) throws IOException,
            JMSException {

        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public javax.jms.Message createMessage(Session session)
                    throws JMSException {
                return session.createTextMessage(msg.getInfo());

package org.openspaces.example.helloworld.common;

import com.gigaspaces.annotation.pojo.SpaceRouting;

 * A simple object used to work with the SpaceClosed Where GigaSpaces data is stored. It is the logical cache that holds data objects in memory and might also hold them in layered in tiering. Data is hosted from multiple SoRs, consolidated as a unified data model..
public class Message  {

    private Integer id;
    private String info;

     * Necessary Default constructor
    public Message() {

     * Constructs a new Message with the given id and info
     * and info.
    public Message(Integer id, String info) {
        this.id = id;
        this.info = info;

     * The id of this message.
     * We will use this attribute to route the message objects when
     * they are written to the space, defined in the Message.gs.xml file.
    public Integer getId() {
        return id;

     * The id of this message.
    public void setId(Integer id) {
        this.id = id;

     * The information this object holds.
    public String getInfo() {
        return info;

     * The information this object holds.
    public void setInfo(String info) {
        this.info = info;

     * A simple toString to print out the state.
    public String toString() {
        return "id[" + id + "] info[" + info +"]";

 Running the Example

  1. Download Apache ActiveMQ from here.

  2. Download Atomikos TransactionEssentials from here.

  3. Extract the example archive into a folder (calling it <helloworld-jta>). Modify the setDevEnv.bat and build.properties files to have the proper paths for GigaSpaces home, Java home, ActiveMQ home and Atomikos home. Also modify the NIC_ADDR and locators variable to have the proper IP address.

  4. Open a command shell and navigate to the <helloworld-jta> folder.

  5. Run the setDevEnv.bat script in the <helloworld-jta> folder to set the environment variables.

  6. Copy the required JARs to the <helloworld-jta>\lib folder using the copy-libs ant task provided.

     build copy-libs


    The example was tested using the following product versions:

    • GigaSpaces - 8.0.1
    • Apache ActiveMQ - 5.5
    • Atomikos TransactionEssentials - 3.7.0

    If you are using different versions, please make sure all the equivalent JARs are reflected in the copy-libs ant task.

  7. Start a gs-ui instance using gs-ui.bat script in the <helloworld-jta>> folder.

  8. Run gs-agent.bat <helloworld-jta> folder, to start the data grid components (GSAClosed Grid Service Agent. This is a process manager that can spawn and manage Service Grid processes (Operating System level processes) such as The Grid Service Manager, The Grid Service Container, and The Lookup Service. Typically, the GSA is started with the hosting machine's startup. Using the agent, you can bootstrap the entire cluster very easily, and start and stop additional GSCs, GSMs and lookup services at will.,GSMClosed Grid Service Manager. This is is a service grid component that manages a set of Grid Service Containers (GSCs). A GSM has an API for deploying/undeploying Processing Units. When a GSM is instructed to deploy a Processing Unit, it finds an appropriate, available GSC and tells that GSC to run an instance of that Processing Unit. It then continuously monitors that Processing Unit instance to verify that it is alive, and that the SLA is not breached., LUSClosed Lookup Service. This service provides a mechanism for services to discover each other. Each service can query the lookup service for other services, and register itself in the lookup service so other services may find it., GSM).

  9. Start the ActiveMQ process using the <ActiveMQHome>\bin\activemq.bat script.

    If ActiveMQ is running on another server, remember to update the brokerURL in the pu.xml file.

  10. Deploy the processorSpace cluster by running the deploy-processor ant task.

    build deploy-processor
  11. Run the feeder process using the run-feeder ant task.

     build run-feeder
  12. If you check the data grid logs, you will notice that the Message-0 (id=0) is rolled back and all other messages are processed successfully and sent to the JMS server.

    Message-0 (id=0) will keep going back to the Polling container logic, because the space update and JMS message both are rolled back. This is done intentionally to demonstrate XA.

  13. You can validate the JMS messages received by the Queue using a test JMS client included. You can run the client using the jms-client ant task.

     build jms-client