Event Container Transport
GigaSpaces event container transport uses event components (namely the polling container and notify container) to receive POJO messages using the Space as the transport layer. It also allows you to send POJO messages using the Space as the transport layer.
Since Mule configuration uses Spring, the event container transport can be easily integrated with OpenSpaces event components.
In order to use the event container transport (using XML namespaces), the following XML headers need to be defined:
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:os-events="http://www.openspaces.org/schema/events"
xmlns:os-core="http://www.openspaces.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:os-eventcontainer="http://www.openspaces.org/schema/mule/os-eventcontainer"
xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio"
xsi:schemaLocation="http://www.openspaces.org/schema/core http://www.openspaces.org/schema/16.1/core/openspaces-core.xsd
http://www.openspaces.org/schema/events http://www.openspaces.org/schema/16.1/events/openspaces-events.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.7.0/mule.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.7.0/mule-stdio.xsd
http://www.openspaces.org/schema/mule/os-eventcontainer http://www.openspaces.org/schema/16.1/mule/3.7.0/mule-os-eventcontainer.xsd">
<!-- Mule configuration comes here ... -->
</mule>
To configure OpenSpaces inbound or outbound components within Mule configuration file, we first have to configure the space. Declaring pure Spring beans within the Mule configuration file should be made between Spring bean tags.
The space
and giga-space
are declared just like any other typical OpenSpaces Spring configuration, for example:
<spring:beans>
<!--
A bean representing a space (an IJSpace implementation).
-->
<os-core:embedded-space id="space" space-name="mySpace" lookup-groups="${user.name}"/>
<!--
OpenSpaces simplified space API built on top of IJSpace/JavaSpace.
-->
<os-core:giga-space id="gigaSpace" space="space"/>
....
....
</spring:beans>
When configuring inbound or outbound OpenSpaces event container endpoints, we must also configure an OpenSpacesConnector
explicitly. This connector acts as a bridge between the inbound and outbound transports and the Spring application context. The application context is then used to get a reference to the GigaSpace
bean and the specific polling or notify container beans.
<os-eventcontainer:connector name="gigaSpacesConnector"/>
Inbound Component
OpenSpaces inbound component is based on either the polling event container or the notify event container. It registers itself as a listener to one of them. Here is an example of configuring a polling event container within the spring:beans
Mule definition:
<spring:beans>
......
......
<os-events:polling-container id="helloPollingEventContainer" giga-space="gigaSpace">
<os-core:template>
<spring:bean class="org.openspaces.itest.esb.mule.SimpleMessage">
<spring:property name="read" value="false"/>
</spring:bean>
</os-core:template>
</os-events:polling-container>
</spring:beans>
Unlike common polling/notify container configuration, this container lacks an event listener. This is due to the fact that the actual inbound transport acts as the event container listener.
We then configure the inbound transport as defined in the example below. The os-eventcontainer://<eventContainerBeanId>
address means that this is an inbound endpoint that receives messages from the OpenSpaces polling/notify container registered under the bean name <eventContainerBeanId>
. Here is an example that uses the helloPollingEventContainer
defined above.
<model name="helloSample">
<service name="MessageReaderUMO">
<inbound-router>
<os-eventcontainer:inbound-endpoint address="os-eventcontainer://helloPollingEventContainer"/>
</inbound-router>
<!-- outbound definition ... -->
</service>
</model>
Threading Model
By default, this inbound transport based on the event container uses the threading model defined within the event container definitions. There is an option to set workManager=true
, which results in offloading the processing of a request to a managed Mule thread. This is currently not recommended.
Outbound Component
The outbound endpoint is used for updating or writing POJO messages back to the Space.
We configure the outbound as defined in the example below. The os-eventcontainer://< gigaSpacebeanId>
address means that this is an outbound endpoint that sends (writes) messages to the Space using the GigaSpace
beans registered under the bean name <gigaSpacebeanId>
. Here is an example:
<model name="helloSample">
<service name="MessageReaderUMO">
<!-- inbound definitions -->
<outbound-router>
<outbound-pass-through-router>
<os-eventcontainer:outbound-endpoint address="os-eventcontainer://gigaSpace"/>
</outbound-pass-through-router>
</outbound-router>
</service>
</model>
There is option to configure writeLease
(defaults to FOREVER
), updateOrWrite
(defaults to true
), and updateTimeout
(defaults to 0
) parameters.
Transaction Support
Since the inbound component is based on the event containers, transaction support can be enabled using the event container transaction support. Learn how to do this with the polling container.
Any operation performed using the GigaSpace
interface joins the ongoing transaction started by the event container using its support for declarative transactions.
This means that any outbound component operating within a Spring managed transaction automatically joins the transaction since it uses GigaSpace
. For outbound components that are used with a different inbound component (such as JMS), the Mule Spring transaction manager can be used. Here is an example (note the custom-transaction tag in the inbound and outbound transports):
<spring:beans>
<os-core:embedded-space id="space" space-name="mySpace"/>
<os-core:distributed-tx-manager id="transactionManager" />
<os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
<spring:bean id="transactionFactory"
class="org.mule.module.spring.transaction.SpringTransactionFactory">
<spring:property name="manager">
<spring:ref bean="transactionManager"/>
</spring:property>
</spring:bean>
</spring:beans>
<model name="helloSample">
<service name="MessageReaderUMO">
<inbound>
<os-eventcontainer:inbound-endpoint
address="os-eventcontainer://helloPollingEventContainer">
<custom-transaction factory-ref="transactionFactory" action="BEGIN_OR_JOIN"/>
</os-eventcontainer:inbound-endpoint>
</inbound>
<component class="org.openspaces.itest.esb.mule.eventcontainer.tx.TxRollBackMessageReader"/>
<outbound>
<outbound-pass-through-router>
<os-eventcontainer:outbound-endpoint giga-space="gigaSpace">
<custom-transaction factory-ref="transactionFactory" action="BEGIN_OR_JOIN"/>
</os-eventcontainer:outbound-endpoint>
</outbound-pass-through-router>
</outbound>
</service>
</model>
In the above example, the Mule transaction factory used is Spring-based, wrapping the Spring PlatformTransactionManager
. For more information regarding OpenSpaces support for transactions (including XA), see the OpenSpaces Core Component - Transaction Manager section.
Full Example
In this example, POJO messages are received (SimpleMessage
) from the Space, with their read
flag set to false
. The MessageReader
service then processes the messages (and among other operations, sets the read
flag to true
). Finally, the MessageReader
result is written back to the Space.
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:os-events="http://www.openspaces.org/schema/events"
xmlns:os-core="http://www.openspaces.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:os-eventcontainer="http://www.openspaces.org/schema/mule/os-eventcontainer"
xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio"
xsi:schemaLocation="http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.7.0/mule.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/3.7.0/mule-stdio.xsd
http://www.openspaces.org/schema/mule/os-eventcontainer http://www.openspaces.org/schema/16.1/mule/3.7.0/mule-os-eventcontainer.xsd">
<description>Tests mule connector, receive and process single object at a time.</description>
<spring:beans>
<os-core:embedded-space id="space" space-name="space" lookup-groups="${user.name}"/>
<os-core:giga-space id="gigaSpace" space="space"/>
<os-events:polling-container id="helloPollingEventContainer" giga-space="gigaSpace">
<os-core:template>
<spring:bean class="org.openspaces.itest.esb.mule.SimpleMessage">
<spring:property name="read" value="false"/>
</spring:bean>
</os-core:template>
</os-events:polling-container>
</spring:beans>
<!-- this connector must be declared ,in order to inject the Spring appliction context -->
<os-eventcontainer:connector name="gigaSpacesConnector"/>
<!--The Mule model initialises and manages your UMO components-->
<model name="helloSample">
<service name="MessageReaderUMO">
<inbound >
<os-eventcontainer:inbound-endpoint address="os-eventcontainer://helloPollingEventContainer"/>
</inbound>
<component class="org.openspaces.itest.esb.mule.MessageReader"/>
<outbound>
<outbound-pass-through-router>
<os-eventcontainer:outbound-endpoint
address="os-eventcontainer://gigaSpace"/>
</outbound-pass-through-router>
</outbound>
</service>
</model>
</mule>
Message Metadata
OpenSpaces mule event transport supports sending and receiving metadata using Mule. Since we are working with pure POJOs, these should implement the MessageHeader
interface in order to include metadata. OpenSpaces Mule integration comes with a simple base class that already implements the MessageHeader
interface called AbstractMessageHeader
.
Transformer
In order to transfer the metadata from the UMOMessage
to the outgoing message via the OpenSpaces outbound endpoint, the transport is configured with a default outbound transformer called org.openspaces.esb.mule.transformers.OpenSpacesTransformer
. The transformer copies the metadata from the UMOMessage
to the transformed object, and assumes that the return object is the payload that is contained within the UMOMessage
.
If the returned object is not the payload within the UMOMessage
, the transformer allows overriding of the getResultPayload(UMOMessage, String)
method and returns the new transformer object.
Here is an example:
public class MessageTransformer extends OpenSpacesTransformer {
protected Object getResultPayload(UMOMessage message, String outputEncoding) {
Message msg = (Message) message.getPayload();
ProcessedMessage pMsg = new ProcessedMessage(msg.getMessage());
pMsg.setProperty("name", "processed message " + msg.getMessage());
return pMsg;
}
}
public class ProcessedMessage extends AbstractMessageHeader implements Message {
private String message;
....
....
}