Summary: This example demonstrates an SBA scenario of stock processing.
The example contains 3 processing units, a feeder, a processor, and a mirror service.
This SBA application can easily scale using GigaSpaces Management tools.
Example Root <GigaSpaces Root>\Examples\StockSBA

Overview

This example demonstrates the usage of the .NET Processing Unit Container persistent space using the nHibernate External Data Source implementation and a mirror service.

Architecture

This example includes a module that is deployed to the grid, and a domain model that consists of Data objects. The XAP66:DataFeeder\ module runs within a Processing Unit and writes Data objects with raw data into the remote space. The space is actually embedded within the other Processing Unit, which runs the XAP66:DataProcessor\ module.

The DataProcessor service takes the new Data objects, processes the raw data and writes them back to the space.

The example solution is based on three projects:

  1. Entries - holds the Data object
  2. Feeder - holds the DataFeeder processing unit
  3. Processor - holds the DataProcessor processing unit and related classes

Application Workflow

  1. The XAP66:DataFeeder\ writes non-processed Data objects into the space every second.
  2. The XAP66:DataProcessor\ takes non-processed Data objects, processes and writes the processed Data objects back to the space.

Data Domain Model

The only object in our model is the Data object.

[SpaceClass]
public class Data
{
[..]
  /// <summary>
  /// Gets the data type, used as the routing index inside the cluster
  /// </summary>
  [SpaceRouting]
  public Nullable<int> Type
  {
    get { return _type; }
    set { _type = value; }
  }
  /// <summary>
  /// Gets the data info
  /// </summary>
  public string Info
  {
    get { return _info; }
    set { _info = value; }
  }
  /// <summary>
  /// Gets or sets the data processed state
  /// </summary>
  public bool Processed
  {
    get { return _processed; }
    set { _processed = value; }
  }
[..]
}

Note the attributes that are used in this object:

  • SpaceClass - the marked object is written into a space.
  • SpaceRouting - when using a partitioned cluster topology, Data objects are routed to the appropriate partitions according to the specified attribute, in this case type.

Basically, every Data object is written to the space by the DataFeeder with the processed value set to false, which is later set to true by the DataProcessor.

DataProcessor

Code
///
/// Process given data from cluster and write the processed data back to the cluster.
/// This processing unit is also responsible for starting up the space it will process the
/// data from.
///
public class DataProcessor : IProcessingUnit
{
[..]
///
///Initializes the processing unit
///
///Cluster info related to this processing unit
///Gets custom properties related to the processing unit
public void Init(ClusterInfo clusterInfo, IDictionarycustomProperties)
{
//Get the space name from the properties
string spaceUrl;
customProperties.TryGetValue(Consts.SpaceUrlProperty, out spaceUrl);
if (String.IsNullOrEmpty(spaceUrl))
throw new ArgumentNullException("spaceUrl", "Custom Properties must contain a space name property (" + Consts.SpaceUrlProperty + ")");
//Gets the use transaction state from the custom properties
string useTransactionStr;
customProperties.TryGetValue(Consts.UseTransactionsProperty, out useTransactionStr);
_useTransactions = (String.IsNullOrEmpty(useTransactionStr) ? false : bool.Parse(useTransactionStr));
//Starts the space as a part of the cluster, the DataProcessor takes data from its embedded space only
SpaceConfig spaceConfig = new SpaceConfig();
spaceConfig.ClusterInfo = clusterInfo;
ISpaceProxy clusterProxy = SpaceProxyProviderFactory.Instance.FindSpace(spaceUrl, spaceConfig);
//Gets the direct proxy from the cluster proxy that will be used by this processing unit
_proxy = clusterProxy.GetServerAdmin().GetDirectProxy();
}

///
///Starts the processing unit
///
public void Start()
{
//Create template to look for data that needs to be processed
Data template = new Data();
template.Processed = false;
//Creates a ExampleProcessor instance
ExampleProcessor myProcessor = new ExampleProcessor();
//Starts a polling container using the given template
_pollingContainer = new ExamplePollingContainer(_proxy, template, _useTransactions, myProcessor.ProcessData);

Console.Write("DataProcessor started, waiting for data that needs processing...");
}

///
///Stops the processing unit
///
public void Stop()
{
//Dispose the container, this stops the container work and cleanup used resources
if (_pollingContainer != null)
{
_pollingContainer.Dispose();
_pollingContainer = null;
}
}

///
///Destroys the processing unit, any allocated resources should be cleaned up in this method
///
public void Destroy()
{
if (_pollingContainer != null)
_pollingContainer.Dispose();
//This processing unit created the space, therefor it should also close it
//by calling Shutdown on the server admin
_proxy.GetServerAdmin().Shutdown();
//Dispose the space proxy. to clean up any used resources
_proxy.Dispose();
}
}

Configuration
/./dotnetProcessingUnitDataExample
false

The actual work is being done by the ExamplePollingContainer and the ExampleProcessor, the polling container is actually taking the unprocessed Data objects from the space and executes the
ExampleProcessor.ProcessData method on it. Then it writes back the processes data back to the space

On Init, the Processing Unit is starts the embedded space with the given ClusterInfo

Another important thing to note is that the Processing Unit explicitly calls _proxy.GetServerAdmin().Shutdown(); on Destroy(), which shuts down the embedded space. Otherwise, the space remains active even when this Processing Unit is undeployed.

ExampleProcessor

The example processor is the business logic behind the data processor, it contains the actual processing method that will be called by the polling container.

/// <summary>
/// This class contain the processing logic
/// </summary>
internal class ExampleProcessor
{
  [..]
  /// <summary>
  /// Receives a data and process it
  /// </summary>
  /// <param name="data">Data received</param>
  /// <returns>Processed data</returns>
  public Data ProcessData(Data data)
  {
    Console.WriteLine("**** processing - Took element with info: " + data.Info);
    //Process data...
    Thread.Sleep(ProcessingTime);
    //Set data state to processed
    data.Processed = true;
    Console.WriteLine("**** processing - done");
    return data;
  }
}

ExamplePollingContainer

The polling container is executing a polling routine and execute a process method on any polled object and writes back the result to the space.

/// <summary>
/// A method that gets a polled object, process it and returns the processing result as an object
/// </summary>
/// <typeparam name="T">Type of object to poll</typeparam>
/// <typeparam name="P">Type of object returned by processing</typeparam>
/// <param name="polledObject">The object that was polled</param>
/// <returns>Object result of processing that will be written back to the space</returns>
public delegate P Process<T, P>(T polledObject);

/// <summary>
/// Executes polling logic on a given space
/// </summary>
/// <typeparam name="T">Type of object to poll</typeparam>
/// <typeparam name="P">Type of object returned by processing</typeparam>
public class ExamplePollingContainer<T, P> : IDisposable
{
[..]

  public ExamplePollingContainer(ISpaceProxy proxy, object template, bool useTransaction, Process<T, P> processMethod)
  {
    //Initialize variables
    _proxy = proxy;
    _useTransaction = useTransaction;
    _processMethod = processMethod;
    _signalShouldWork = new ManualResetEvent(false);
    _isDisposed = false;
    //Create IPreparedTemplate from the given template
    _template = proxy.Snapshot(template);
    //Get current space mode
    _spaceMode = _proxy.GetServerAdmin().SpaceMode;
    //Register for space mode changed events that will start/stop the work according to the
    //space mode (Work only in primary)
    _proxy.GetServerAdmin().SpaceModeChanged += PollingContainer_SpaceModeChanged;
    //Creates a new thread for the polling action and start it
    _pollingThread = new Thread(new ThreadStart(DoPoll));
    _pollingThread.Start();
  }

  /// <summary>
  /// Switch working state according to the new space mode
  /// </summary>
  /// <param name="sender"></param>
  /// <param name="e"></param>
  private void PollingContainer_SpaceModeChanged(object sender, SpaceModeEventArgs e)
  {
    //Update the current space mode
    _spaceMode = e.Mode;
    //If it is primary, signal the polling thread to start working
    if (_spaceMode == SpaceMode.Primary)
      _signalShouldWork.Set();
    //else signal it to stop working
    else
      _signalShouldWork.Reset();
  }

  private void DoPoll()
  {
    //This method must be called on each new thread that is created, it is
    //used to set the context class loader on the JVM
    SpaceProxyProviderFactory.Instance.SetThreadContext(_proxy);
    while (!_isDisposed)
    {
      //Work only if the space is in primary mode
      if (_spaceMode == SpaceMode.Primary)
      {
        try
        {
          //Creates a local transaction if required
          ITransaction tx = _useTransaction ? _proxy.CreateLocalTransaction() : null;
          try
          {
            //Executes the actual poll by Take space operation
            T polledObject = _proxy.Take<T>(_template, tx, _timeout);
            if (polledObject != null)
            {
              //If an object was polled executes the process method on it
              P processedObject = _processMethod(polledObject);
              //If the process method returned a value, write it back to the space
              if (processedObject != null)
                _proxy.Write(processedObject);
            }
            //Executes transaction if needed
            if (tx != null)
              tx.Commit();
          }
          catch
          {
            //Abort the transaction if an error happend
            if (tx != null)
              tx.Abort();
          }
        }
        //In case space switched to backup mode inside the if block, an exception will be
        //thrown and we want to return to the loop and wait until the space returns
        //to primary mode or any other exception.
        catch (Exception ex)
        {
          Console.WriteLine(ex);
        }
      }
      else
        //Waits until the space returns to the primary mode
        WaitHandle.WaitAll(new WaitHandle[] { _signalShouldWork });
    }
  }

  public void Dispose()
  {
    //Will stop the working thread
    _isDisposed = true;
    //Unregister from space mode changed events
    _proxy.GetServerAdmin().SpaceModeChanged -= PollingContainer_SpaceModeChanged;
    //Waits for the polling thread to stop with a timeout
    _pollingThread.Join(_timeout * 5);
  }

}

The polling container receives a proxy, template and a delegate to a Process method and executes a polling routine that attempts to take an object from the given proxy by the given template, if it succeed it executes the process method on it and writes back the result to the proxy.

Since the polling container should only work on space which is in Primary mode, it first register to space mode changed events and will only work when the space is in Primary mode.

DoPoll does the polling routine on a separate thread and includes a method call to SpaceProxyProviderFactory.Instance.SetThreadContext(_proxy). This method call must be executed on every new thread created in the Processing Unit that needs to perform operations on the space.

To stop to polling the Dispose should be called.

DataFeeder

The data feeder is in charge of feeding the cluster with unprocessed data every second. It does so by creating new Data objects with a random type and random information, and writes it to the cluster.

Code
///
/// Data feeder feeds new data to the space that needs to be processed
///
public class DataFeeder : IProcessingUnit
{
[..]

///
/// Initializes the processing unit
///
///Cluster info related to this processing unit
///Gets custom properties related to the processing unit
public void Init(ClusterInfo clusterInfo, IDictionarycustomProperties)
{
//Get the space name from the properties
string spaceUrl;
customProperties.TryGetValue(Consts.SpaceUrlProperty, out spaceUrl);
if (String.IsNullOrEmpty(spaceUrl))
throw new ArgumentNullException("spaceUrl", "Custom Properties must contain a space name property (" + Consts.SpaceUrlProperty + ")");
//Get feed delay from the custom properties, if none found uses the default one
string feedDelayStr;
customProperties.TryGetValue(Consts.FeedDelayProperty, out feedDelayStr);
_feedDelay = (String.IsNullOrEmpty(feedDelayStr) ? DefaultFeedDelay : int.Parse(feedDelayStr));
//Connects to the space
_proxy = SpaceProxyProviderFactory.Instance.FindSpace(spaceUrl);
}

///
///Starts the processing unit
///
public void Start()
{
//Set the started state to true
_started = true;
//Create a working thread
_feedingThread = new Thread(new ThreadStart(StartWorking));
//Starts the working thread
_feedingThread.Start();
}

///
///Stops the processing unit
///
public void Stop()
{
//Set the started state to false
_started = false;
}

///
///Destroys the processing unit, any allocated resources should be cleaned up in this method
///
public void Destroy()
{
//Set the started state to false
_started = false;
//Wait for the working thread to finish its work
_feedingThread.Join(_feedDelay * 5);
//It is very important to dispose the space when done to clean up
//any uses resources
_proxy.Dispose();
}

///
/// Generates and feeds data to the space
///
private void StartWorking()
{
try
{
//This method must be called on each new thread that is created, it is
//used to set the context class loader on the JVM
SpaceProxyProviderFactory.Instance.SetThreadContext(_proxy);
Random random = new Random();
while (_started)
{
//Create a new data object with random info and random type
Data data = new Data(Guid.NewGuid().ToString(), random.Next(0, DataTypesCount));
Console.WriteLine("Added data object with info {0} and type {1}", data.Info, data.Type);
//Feed the data into the cluster
_proxy.Write(data);
Thread.Sleep(_feedDelay);
}
}
[..]
}
}

Configuration
jini://*/*/dotnetProcessingUnitDataExample

The StartWorking() method does the actual work, by creating a new Data object with random data in an unprocessed state every second, and feeds it to the cluster.

StartWorking includes a method call to SpaceProxyProviderFactory.Instance.SetThreadContext(_proxy) for the same reason described in the XAP66:DataProcessor\ section.

Building the Example

This example includes two scripts, compile.bat and copydeploymentfiles.bat. these two files needs to be executed.

From the <Example Root> directory (<GigaSpaces Root>\dotnet\examples\ProcessingUnit), call:

compile

This compiles all the related projects and creates the processing units dlls inside each project under Deployment\lib directory.

Then the copydeploymentfiles.bat file should be executed.

From the <Example Root> directory (<GigaSpaces Root>\dotnet\examples\ProcessingUnit), call:

copydeploymentfiles

This copies the Processing Units Deployment directory to the <GigaSpaces Root>\deploy directory which simplifies deployment through the gs-ui.

The Deployment Descriptor (pu.xml) should always reside under the META-INF\spring directory of your application. It is recommended to use the same directory structure for the lib directory of your processing units.

Deployment

There are two ways to deploy the Processing Units:

Standalone Deployment

One option is to run a standalone Processing Unit container and deploy the Processing Unit into it. This is done simply by calling the following commands from your <GigaSpaces Root>\bin directory:

The following deploys the data processor:

puInstance ..\dotnet\examples\ProcessingUnit\Processor\Deployment

The following deploys the data feeder:

puInstance ..\dotnet\examples\ProcessingUnit\Feeder\Deployment

Each command creates the standalone container and deploys the DataProcessor or DataFeeder Processing Units to it. Since the Processing Unit logic is defined in the Start method, it starts working immediately after it is deployed.

Grid Deployment

Under <GigaSpaces Root>\dotnet\examples\ProcessingUnit, you can find two directories: Feeder and Processor. These contain the two Processing Units projects, in each of this directory there's the Deployment directory.

This directory is in the required structure to deploy a Processing Unit, and is copied by the copydeploymentfiles script to the <GigaSpaces Root>\deploy directory. This allows deployment to the Service Grid using the gs-ui.

META-INF\spring is where pu.xml resides. This file defines exactly which Processing Unit to deploy.

After you run the build script and the copy deployment files script, the two directories are copied to the <GigaSpaces Root>\deploy directory, start up the Service Grid. Because this example runs in a partitioned cluster with two primary spaces and one backup space for each partition; you need to run one Grid Service Manager (GSM) and two Grid Service Containers (GSC), and then start the GigaSpaces Management Center:

Start <GigaSpaces Root>\bin\gsm
Start <GigaSpaces Root>\bin\gsc
Start <GigaSpaces Root>\bin\gsc
Start <GigaSpaces Root>\bin\gs-ui

For running the GSC in a 64-bit platform, follow instructions in: http://www.gigaspaces.com/wiki/display/XAP66/Running+Services+-+GSC+and+GSM#RunningServices-GSCandGSM-64bitConfiguration

Since the spaces are running inside the dataprocessor, it makes sense to deploy it first and the datafeeder second.

  1. In the GigaSpaces Management Center, click on the tab named Deployments, Details, and then click the Deploy new application button ().
  2. In the Deployment Wizard, choose SBA Application - Processing Unit and click Next.



  3. Now, all you need to do is type the name of the Processing Unit (identical to the name of the folder that is now in the deploy directory) in the Processing Unit Name field, for the dataprocessor select partitioned-sync2backup Cluster Schema, Number Of Instances: 2 Backups: 1 and click Deploy. This will create the partitioned cluster described above.
  4. Now in order to deploy the datafeeder Procesing Unit, you repeat the same processes but type datafeeder in the Processing Unit Name field, There is no need to select a Cluster Schema or Number Of Instances since the feeder connects to the cluster and doesn't create spaces. However, you can deploy more than one datafeeder by changing the Number Of Instances field.

  5. Now, check out the GSC consoles to see the events flowing in and out!
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence