JTA-XA Example
Author | Product Version | Reference | Download |
---|---|---|---|
Shravan (Sean) Kumar | 8.0.1 | JTA-XA example |
Overview
Integrating GigaSpaces products with an external JMS Server is demonstrated in this page. This example shows how the data grid's event container can process events and send JMS messages to a external JMS server, all under one distributed transaction.
Use of distributed transactions is done as a demonstration. Use this with caution, in production applications this can be expensive and have a performance penalty. Well known patterns like Idempotent Receiver are potential alternatives to distributed transactions.
This example has the following characteristics:
- GigaSpaces updates and JMS message creation are done under transactions so as to avoid duplicate processing/data loss.
- Apache ActiveMQ is used as a the JMS provider.
- Atomikos is used as the JTA Transaction provider and uses the XA protocol.
- Example is based on the GigaSpaces helloworld example included in the product package.
- To demonstrate the XA transaction it rollbacks messages with 100 modulo. You will notice that these messages will never appear in the JMS queue and are rolled back on the GigaSpaces server.
Source Code
<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>
<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
<!-- creates an activemq xa connection factory using the amq namespace -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL"><value>tcp://localhost:61616</value></property>
</bean>
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
<property name="physicalName"><value>gigaspaces.helloworld.jms.exampleQueue</value></property>
</bean>
<!-- Configure the JMS connector -->
<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="MY_QUEUE"/>
<property name="xaConnectionFactory" ref="amqConnectionFactory" />
<property name="localTransactionMode" value="false"></property>
</bean>
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="sessionTransacted" value="true" />
</bean>
<!-- A bean representing a space (an IJSpace implementation). -->
<os-core:embedded-space id="space" name="processorSpace" />
<!-- A wrapper bean to the space to provide OpenSpaces simplified space
API (built on top of IJSpace/JavaSpace). -->
<os-core:giga-space id="gigaSpace" space="space"
tx-manager="transactionManager" />
<!-- A Polling Container bean that performs repeated take operations from
the space of objects matching a defined template. (The take operations are
by default blocking, which means a single take operation is waiting until
a match is found) The template here instructs the polling container to take
objects of type Message with their "info" attribute set to "Hello ". When
a match is found, the object is taken and passed to a listener bean - here
the listener is the previously defined Processor bean. This bean has the
method processMessage(), which is invoked on the taken object, retuning a
processed object. After the object is processed, it is written back to the
space by the Polling Container. -->
<os-events:polling-container id="helloProcessorPollingEventContainer"
giga-space="gigaSpace">
<os-events:tx-support tx-manager="transactionManager" />
<os-core:template>
<bean class="org.openspaces.example.helloworld.common.Message">
<property name="info" value="Hello " />
</bean>
</os-core:template>
<os-events:listener>
<os-events:annotation-adapter>
<os-events:delegate ref="helloProcessor" />
</os-events:annotation-adapter>
</os-events:listener>
</os-events:polling-container>
<!-- The processor bean -->
<bean id="helloProcessor" class="org.openspaces.example.helloworld.processor.Processor">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="queue" ref="queue"/>
</bean>
Processor Bean and Message Bean definitions
package org.openspaces.example.helloworld.processor;
import java.io.IOException;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import org.openspaces.events.adapter.SpaceDataEvent;
import org.openspaces.example.helloworld.common.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
* The processor is passed interesting Objects from its associated
* PollingContainer
* <p>
* The PollingContainer removes objects from the GigaSpace that match the
* criteria specified for it.
* <p>
* Once the Processor receives each Object, it modifies its state and returns it
* to the PollingContainer which writes them back to the GigaSpace
* <p/>
* <p>
* The PollingContainer is configured in the pu This is the unit of packaging and deployment in the GigaSpaces Data Grid, and is essentially the main GigaSpaces service. The Processing Unit (PU) itself is typically deployed onto the Service Grid. When a Processing Unit is deployed, a Processing Unit instance is the actual runtime entity..xml file of this project
*/
public class Processor implements InitializingBean {
Logger logger = Logger.getLogger(this.getClass().getName());
private JmsTemplate jmsTemplate;
private Queue queue;
private int msgCtr;
private int rollbacks;
/**
* Process the given Message and return it to the caller. This method is
* invoked using OpenSpaces Events when a matching event occurs.
*
* @throws JMSException
* @throws Exception
*/
@SpaceDataEvent
public Message processMessage(Message msg) throws Exception {
logger.info("Processor PROCESSING: " + msg + " MessageCtr: " + ++msgCtr);
myBusinessLogic(msg);
try {
sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
if (msg.getId() % 100 == 0) {
logger.info("Rolling back: " + msg.getId() + " Rollback Counter: "
+ ++rollbacks);
throw new Exception("Rollback");
}
return msg;
}
private void myBusinessLogic(Message msg) {
msg.setInfo(msg.getInfo() + "World !! Message id: " + msg.getId());
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public Processor() {
logger.info("Processor instantiated, waiting for messages feed...");
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("In the Processor Bean");
}
/**
* Send to the queue
*/
private void sendMessage(final Message msg) throws IOException,
JMSException {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public javax.jms.Message createMessage(Session session)
throws JMSException {
return session.createTextMessage(msg.getInfo());
}
});
}
}
package org.openspaces.example.helloworld.common;
import com.gigaspaces.annotation.pojo.SpaceRouting;
/**
* A simple object used to work with the Space Where GigaSpaces data is stored. It is the logical cache that holds data objects in memory and might also hold them in layered in tiering. Data is hosted from multiple SoRs, consolidated as a unified data model..
*/
public class Message {
private Integer id;
private String info;
/**
* Necessary Default constructor
*/
public Message() {
}
/**
* Constructs a new Message with the given id and info
* and info.
*/
public Message(Integer id, String info) {
this.id = id;
this.info = info;
}
/**
* The id of this message.
* We will use this attribute to route the message objects when
* they are written to the space, defined in the Message.gs.xml file.
*/
@SpaceRouting
public Integer getId() {
return id;
}
/**
* The id of this message.
*/
public void setId(Integer id) {
this.id = id;
}
/**
* The information this object holds.
*/
public String getInfo() {
return info;
}
/**
* The information this object holds.
*/
public void setInfo(String info) {
this.info = info;
}
/**
* A simple toString to print out the state.
*/
public String toString() {
return "id[" + id + "] info[" + info +"]";
}
}
Running the Example
-
Download Apache ActiveMQ from here.
-
Download Atomikos TransactionEssentials from here.
-
Extract the example archive into a folder (calling it <helloworld-jta>). Modify the setDevEnv.bat and build.properties files to have the proper paths for GigaSpaces home, Java home, ActiveMQ home and Atomikos home. Also modify the NIC_ADDR and locators variable to have the proper IP address.
-
Open a command shell and navigate to the <helloworld-jta> folder.
-
Run the
setDevEnv.bat
script in the <helloworld-jta> folder to set the environment variables. -
Copy the required JARs to the <helloworld-jta>\lib folder using the
copy-libs
ant task provided.build copy-libs
The example was tested using the following product versions:
- GigaSpaces - 8.0.1
- Apache ActiveMQ - 5.5
- Atomikos TransactionEssentials - 3.7.0
If you are using different versions, please make sure all the equivalent JARs are reflected in the
copy-libs
ant task. -
Start a gs-ui instance using
gs-ui.bat
script in the <helloworld-jta>> folder. -
Run
gs-agent.bat
<helloworld-jta> folder, to start the data grid components (GSA Grid Service Agent. This is a process manager that can spawn and manage Service Grid processes (Operating System level processes) such as The Grid Service Manager, The Grid Service Container, and The Lookup Service. Typically, the GSA is started with the hosting machine's startup. Using the agent, you can bootstrap the entire cluster very easily, and start and stop additional GSCs, GSMs and lookup services at will.,GSM Grid Service Manager. This is is a service grid component that manages a set of Grid Service Containers (GSCs). A GSM has an API for deploying/undeploying Processing Units. When a GSM is instructed to deploy a Processing Unit, it finds an appropriate, available GSC and tells that GSC to run an instance of that Processing Unit. It then continuously monitors that Processing Unit instance to verify that it is alive, and that the SLA is not breached., LUS Lookup Service. This service provides a mechanism for services to discover each other. Each service can query the lookup service for other services, and register itself in the lookup service so other services may find it., GSM). -
Start the ActiveMQ process using the <ActiveMQHome>
\bin\activemq.bat
script.If ActiveMQ is running on another server, remember to update the brokerURL in the
pu.xml
file. -
Deploy the processorSpace cluster by running the
deploy-processor
ant task.build deploy-processor
-
Run the feeder process using the
run-feeder
ant task.build run-feeder
-
If you check the data grid logs, you will notice that the Message-0 (id=0) is rolled back and all other messages are processed successfully and sent to the JMS server.
Message-0 (id=0) will keep going back to the Polling container logic, because the space update and JMS message both are rolled back. This is done intentionally to demonstrate XA.
-
You can validate the JMS messages received by the Queue using a test JMS client included. You can run the client using the
jms-client
ant task.build jms-client
References
- JTA/XA support information, Transaction Management.
- XA transactions using Spring, http://www.javaworld.com/javaworld/jw-04-2007/jw-04-xa.html.
- Distributed transactions in Spring, with and without XA, http://www.javaworld.com/javaworld/jw-01-2009/jw-01-spring-transactions.html.
- Atomikos TransactionEssentials Spring Integration information, http://www.atomikos.com/Documentation/SpringIntegration.