Summary: This example demonstrates basic usage of a .NET Processing Unit. The example contains two Processing Unit Containers; one that feeds data objects into the system, and another that reads these objects and processes them.
This page is relevant to an older version of XAP.NET, version 7.0.0. For latest versions refer to this page.
Example Root <GigaSpaces Root>\Examples\ProcessingUnit

Overview

This example demonstrates basic usage of the .NET Processing Unit Container – a complete SBA application that can easily scale.

Architecture

This example includes a module that is deployed to the grid, and a domain model that consists of Data objects. The DataFeeder module runs within a Processing Unit Container and writes Data objects with raw data into the remote space. The space is actually embedded within the other Processing Unit Container, which runs the DataProcessor module.

The DataProcessor service takes the new Data objects, processes the raw data and writes them back to the space.

The example solution is based on three projects:

  1. Entries - holds the Data object
  2. Feeder - holds the DataFeeder processing unit container
  3. Processor - holds the DataProcessor processing unit container and related classes

Application Workflow

  1. The DataFeeder writes non-processed Data objects into the space every second.
  2. The DataProcessor takes non-processed Data objects, processes them, and writes the processed Data objects back to the space.

Data Domain Model

The only object in our model is the Data object.

[SpaceClass]  
public class Data
{
[..]
  /// <summary>
  /// Gets the data type, used as the routing index inside the cluster
  /// </summary>
  [SpaceRouting]
  public Nullable<int> Type
  {
    get { return _type; }
    set { _type = value; }
  }
  /// <summary>
  /// Gets the data info
  /// </summary>
  public string Info
  {
    get { return _info; }
    set { _info = value; }
  }
  /// <summary>
  /// Gets or sets the data processed state
  /// </summary>
  public bool Processed
  {
    get { return _processed; }
    set { _processed = value; }
  }
[..]
}

Note the attributes that are used in this object:

  • SpaceClass – the marked object is written into a space.
  • SpaceRouting – when using a partitioned cluster topolgy, Data objects are routed to the appropriate partitions according to the specified attribute, in this case type.

Basically, every Data object is written to the space by the DataFeeder with the processed value set to false, which is later set to true by the DataProcessor.

DataProcessor

Code
/// <summary>
/// Process given data from cluster and write the processed data back to the cluster.
/// This processing unit is also responsible for starting up the space that it processes the
/// data from.
/// </summary>
public class DataProcessor : IProcessingUnitContainer
{
[..]
  ///<summary>
  ///Initializes the processing unit
  public void Initialize()
  {
    //Get the space name from the properties
    string spaceUrl;
    _properties.TryGetValue(Consts.SpaceUrlProperty, out spaceUrl);
    if (String.IsNullOrEmpty(spaceUrl))
      throw new ArgumentNullException("spaceUrl", "Custom Properties must contain a space name property (" + Consts.SpaceUrlProperty + ")");
    //Gets the use transaction state from the custom properties
    string useTransactionStr;
    _properties.TryGetValue(Consts.UseTransactionsProperty, out useTransactionStr);
    _useTransactions = (String.IsNullOrEmpty(useTransactionStr) ? false : bool.Parse(useTransactionStr));      
    //Starts the space as a part of the cluster, the DataProcessor takes data from its embedded space only
    SpaceConfig spaceConfig = new SpaceConfig();
    spaceConfig.ClusterInfo = _clusterInfo;
    ISpaceProxy clusterProxy = SpaceProxyProviderFactory.Instance.FindSpace(spaceUrl, spaceConfig);
    //Gets the direct proxy from the cluster proxy that will be used by this processing unit
    _proxy = clusterProxy.GetServerAdmin().GetDirectProxy();
    //Create the event listener container 
    _eventListenerContainer = EventListenerContainerFactory.CreateContainer<Data>(_proxy, new ExampleProcessor());
    //If transaction should be used, supply the container with a local transaction manager
    if (useTransactions)
        _eventListenerContainer.TransactionManager = _proxy.LocalTransactionManager;
    //Start the event listener container
    _eventListenerContainer.Start();

    Console.Write("DataProcessor started, waiting for data that needs processing...");     
  }        

  ///<summary>
  ///Destroys the processing unit, any allocated resources should be cleaned up in this method
  ///</summary>
  public void Dispose()
  {
    if (_eventListenerContainer != null)
    {
       _eventListenerContainer.Stop();
       _eventListenerContainer.Dispose();
    }
    //This processing unit created the space, therefor it should also close it
    //by calling Shutdown on the server admin
    _proxy.GetServerAdmin().Shutdown();
    //Dispose the space proxy. to clean up any used resources      
    _proxy.Dispose();
  }
  
  /// <summary>
  /// This property is set at deploy-time by the service grid, it contains the cluster
  /// information of this processing unit instance
  /// </summary>
  public ClusterInfo ClusterInfo
  {
    set { _clusterInfo = value; }
  }
  /// <summary>
  /// This property is set at deploy time by the service grid, it contains deploy-time context
  /// properties and any additional properties passed by the user
  /// </summary>
  public IDictionary<string, string> Properties
  {
    set { _properties = value; }
  }	  
}

Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="GigaSpaces.XAP" type="GigaSpaces.XAP.Configuration.GigaSpacesXAPConfiguration, GigaSpaces.Core"/>
  </configSections>
  <appSettings>
    <add key="SpaceUrl" value="/./dotnetProcessingUnitDataExample"/>
    <add key="UseTransactions" value="false"/>
  </appSettings>
  <GigaSpaces.XAP>
    <ProcessingUnitContainer ImplementingType="GigaSpaces.Examples.ProcessingUnit.Processor.DataProcessor, GigaSpaces.Examples.ProcessingUnit.Processor"/>
  </GigaSpaces.XAP>
</configuration>

The actual work is done by a Polling Container and the ExampleProcessor. The polling container provider the abstraction for data event that triggers the business logic by taking the unprocessed Data objects from the space and executes the ExampleProcessor.ProcessData method on it. Then it writes the processed data back to the space.

See Event Listener Container for more info about event listening abstraction.

On Initialize, the Processing Unit starts the embedded space with the given ClusterInfo.

Another important thing to note is that the Processing Unit explicitly calls _proxy.GetServerAdmin().Shutdown(); on Dispose(), which shuts down the embedded space. Otherwise, the space remains active even when this Processing Unit is undeployed.

ExampleProcessor

The example processor is the business logic behind the data processor. It contains the actual processing method that is called by a Polling Container. The different attributes will be used to create and configure the polling container that will trigger the data event and invoke the ProcessData method which represents the business logic. The polling container is aware of the mode the space is in, and it will only work when the space is in Primary mode.

/// <summary>
/// This class contain the processing logic, marked as polling event driven.
/// </summary>
[PollingEventDriven(MinConcurrentConsumers = 1, MaxConcurrentConsumers = 4)]
internal class ExampleProcessor
{ 
  /// <summary>
  /// Gets an unprocessed data
  /// </summary>
  [EventTemplate]
  public Data UnprocessedData
  {
    get
    { 
      Data template = new Data();
      template.Processed = false;
      return template;
    }
  }
 
  /// <summary>
  /// Fake delay that represents processing time
  /// </summary>
  private const int ProcessingTime = 100;
  /// <summary>
  /// Receives a data and process it
  /// </summary>
  /// <param name="data">Data received</param>
  /// <returns>Processed data</returns>
  [DataEventHandler]
  public Data ProcessData(Data data)
  {
      Console.WriteLine("**** processing - Took element with info: " + data.Info);
      //Process data...      
      Thread.Sleep(ProcessingTime);
      //Set data state to processed
      data.Processed = true;
      Console.WriteLine("**** processing - done");
      return data;
  }
}

DataFeeder

The data feeder is in charge of feeding the cluster with unprocessed data every second. It does so by creating new Data objects with a random type and random information, and writes it to the cluster.

Code
/// <summary>
/// Data feeder feeds new data to the space that needs to be processed
/// </summary>
public class DataFeeder : IProcessingUnitContainer
{
  [..]
  
  ///<summary>  
  /// Initializes the processing unit    
  ///</summary>    
  public void Initialize()
  {
    //Get the space name from the properties
    string spaceUrl;
    _properties.TryGetValue(Consts.SpaceUrlProperty, out spaceUrl);
    if (String.IsNullOrEmpty(spaceUrl))
      throw new ArgumentNullException("spaceUrl", "Custom Properties must contain a space name property (" + Consts.SpaceUrlProperty + ")");
    //Get feed delay from the custom properties, if none found uses the default one
    string feedDelayStr;
    _properties.TryGetValue(Consts.FeedDelayProperty, out feedDelayStr);
    _feedDelay = (String.IsNullOrEmpty(feedDelayStr) ? DefaultFeedDelay : int.Parse(feedDelayStr));            
    //Connects to the space
    _proxy = SpaceProxyProviderFactory.Instance.FindSpace(spaceUrl);
    //Set the started state to true
    _started = true;
    //Create a working thread
    _feedingThread = new Thread(new ThreadStart(StartWorking));
    //Starts the working thread
    _feedingThread.Start();
  }    

  ///<summary>
  ///Destroys the processing unit, any allocated resources should be cleaned up in this method
  ///</summary>
  public void Dispose()
  {
    //Set the started state to false
    _started = false;
    //Wait for the working thread to finish its work
    _feedingThread.Join(_feedDelay * 5);
    //It is very important to dispose the space when done to clean up
    //any uses resources
    _proxy.Dispose();
  }
  
  /// <summary>
  /// This property is set at deploy time by the service grid, it contains deploy-time context
  /// properties and any additional properties passed by the user
  /// </summary>
  public IDictionary<string, string> Properties
  {
    set { _properties = value; }
  }

  /// <summary>
  /// Generates and feeds data to the space
  /// </summary>
  private void StartWorking()
  {
    try
    {
      Random random = new Random();
      while (_started)
      {
        //Create a new data object with random info and random type
        Data data = new Data(Guid.NewGuid().ToString(), random.Next(0, DataTypesCount));
        Console.WriteLine("Added data object with info {0} and type {1}", data.Info, data.Type);
        //Feed the data into the cluster
        _proxy.Write(data);
        Thread.Sleep(_feedDelay);
      }
    }
    [..]
  }
}

Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="GigaSpaces.XAP" type="GigaSpaces.XAP.Configuration.GigaSpacesXAPConfiguration, GigaSpaces.Core"/>
  </configSections>
  <appSettings>
    <add key="SpaceUrl" value="jini://*/*/dataExampleSpace"/>
  </appSettings>
  <GigaSpaces.XAP>
    <ProcessingUnitContainer ImplementingType="GigaSpaces.Examples.ProcessingUnit.Feeder.DataFeeder, GigaSpaces.Examples.ProcessingUnit.Feeder"/>
  </GigaSpaces.XAP>
</configuration>

The StartWorking() method does the actual work, by creating a new Data object with random data in an unprocessed state every second, and feeds it to the cluster.

Building the Example

This example includes compile.bat script.

From the <Example Root> directory (<GigaSpaces Root>\Examples\ProcessingUnit), call:

compile

This compiles all the related projects and creates the processing unit dlls inside each project, under the Deployment directory. It also copies the Processing Units Deployment directory to the <GigaSpaces Root>\Runtime\deploy directory, which simplifies deployment through the gs-ui.

The Deployment config file (pu.config) should always reside under the root directory of your application.

Deployment

There are two ways to deploy the Processing Units:

Standalone Deployment

One option is to run a standalone Processing Unit container and deploy the Processing Unit into it. This is done simply by calling the following commands from your <GigaSpaces Root>\Bin directory:

The following deploys the data processor:

puInstance ..\Examples\ProcessingUnit\Processor\Deployment

The following deploys the data feeder:

puInstance ..\Examples\ProcessingUnit\Feeder\Deployment

Each command creates the standalone container and deploys the DataProcessor or DataFeeder Processing Units to it. Since the Processing Unit logic is defined in the Initialize method, it starts working immediately after it is deployed.

Grid Deployment

Under <GigaSpaces Root>\Examples\ProcessingUnit, there are two directories: Feeder and Processor. These contain the two Processing Unit projects, and in each of these directories there is a Deployment directory. This directory is in the required structure to deploy a Processing Unit, and is copied by the copydeploymentfiles script to the <GigaSpaces Root>\Runtime\deploy directory. This allows deployment to the Service Grid using the gs-ui.

The pu.config resides in the Deployment directory of each processing unit. This file defines exactly which Processing Unit to deploy.

After you run the build script and the copy deployment files script, the two directories are copied to the <GigaSpaces Root>\Runtime\deploy directory, to start up the Service Grid. Because this example runs in a partitioned cluster with two primary spaces and one backup space for each partition, you need to run one Grid Service Manager (GSM) and two Grid Service Containers (GSC), and then start the GigaSpaces Management Center.

Start <GigaSpaces Root>\Bin\gsm
Start <GigaSpaces Root>\Bin\gsc
Start <GigaSpaces Root>\Bin\gsc
Start <GigaSpaces Root>\Bin\gs-ui

Since the spaces are running inside the dataprocessor, it makes sense to deploy the dataprocessor first and the datafeeder second.

  1. In the GigaSpaces Management Center, click on the tab named Deployments, Details, and then click the Deploy new application button ().
  2. In the Deployment Wizard, choose SBA Application - Processing Unit, and click Next.



  3. Now, all you need to do is type the name of the Processing Unit (identical to the name of the folder that is now in the deploy directory) in the Processing Unit Name field. For the dataprocessor, select partitioned-sync2backup Cluster Schema, Number Of Instances: 2, Backups: 1, and click Deploy. This creates the partitioned cluster described above.
  4. Now in order to deploy the datafeeder Procesing Unit, you repeat the same processes but type datafeeder in the Processing Unit Name field. There is no need to select a Cluster Schema or Number Of Instances, since the feeder connects to the cluster and doesn't create spaces. However, you can deploy more than one datafeeder by changing the Number Of Instances field.

  5. Now, check out the GSC consoles to see the events flowing in and out!
GigaSpaces.com - Legal Notice - 3rd Party Licenses - Site Map - API Docs - Forum - Downloads - Blog - White Papers - Contact Tech Writing - Gen. by Atlassian Confluence