Master-Worker Pattern

The Master-Worker Pattern (sometimes called the Master-Slave or the Map-Reduce pattern) is used for parallel processing. It follows a simple approach that allows applications to perform simultaneous processing across multiple machines or processes via a Master and multiple Workers.


In GigaSpaces data grid, you can implement the Master-Worker pattern using several methods:

  • Task Executors - best for a scenario where the processing activity is collocated with the data (the data is stored within the same space as the tasks being executed).
  • Polling Containers - in this case the processing activity runs in a separate machine/VM from the space. This approach should be used when the processing activity consumes a relatively large amount of CPU and takes a large amount of time. It might also be relevant if the actual data required for the processing is not stored within the space, or the time it takes to retrieve the required data from the space is much shorter than the time it takes to complete the processing.

 Implementing Master-Worker in GigaSpaces using Polling Containers

The Polling Containers approach uses the classic JavaSpaces write/take operations to implement the parallel processing. This allows a Master client application to generate a Job that is a set of Request objects, write these into the space and immediately perform a Take operation on the Result objects.

The Request object has an execute method that includes the actual processing business logic. The Workers, implemented via a polling container, perform a continuous Take operation on the Request objects. Once a Request object has been consumed, its execute method is called and a Result object is written back into the space. The Master application consumes these incoming objects. Once the amount of Result objects consumed by the Master for the relevant Job matches the amount of Request objects, the Job execution has been completed.

When there is one space (with or without a backup) used by the Master and Workers, you can run the workers in blocking mode (take operation with timeout > 0). This means that once a matching Request is written into the space, one of the running workers immediately consumes it.

When running multiple Workers, processing is load-balanced across all the workers in an even manner. When there is a large amount of activity, you might need to run a partitioned space to allow the space layer to store a large number of Request objects (there will always be a small number of Result objects in the space), and to cope with a large number of Workers. This makes sure that your system can scale, and the space layer does not act as a bottleneck.

When running the space in clustered partitioned mode, you cannot run the workers in blocking mode without assigning a value to the Request object routingClosed The mechanism that is in charge of routing the objects into and out of the corresponding partitions. The routing is based on a designated attribute inside the objects that are written to the Space, called the Routing Index. field. The Designated Workers approach allows you to run the workers against a partitioned space, in blocking mode.

The following sections include code samples and configuration that illustrate the Master-Worker implementation via Polling Containers, using the Random Workers and Designated Workers approach.

 Example 1 - Random Workers

With the Random Workers approach, each worker can consume Request objects from every space partition. In this case, the non-blocking mode is used. The workers scan the partitions in a round-robin fashion for a Request object to consume and execute. With this approach, there might be a small delay until the workers consume a Request object. This approach might generate some chatting over the network, since the workers connect to all existing partitions to look for Request objects to consume and in case none is found, wait for some time and then try again.

Step 1:


Step 2:


public class Master {

    static GigaSpace space;

    public static void main(String[] args) {
        space= new GigaSpaceConfigurer(new UrlSpaceConfigurer("jini://**/**/mySpace")).gigaSpace();

        for (int i=0;i<10;i++)
            submitJob(i, 100);

    static public void submitJob(int jobId , int tasks)
        System.out.println(new Date(System.currentTimeMillis())+
            " - Executing Job " +jobId);
        Request requests [] = new Request [tasks];
        for (int i=0;i<tasks; i++)
            requests [i] = new  Request ();
            requests [i].setJobID(jobId);
            requests [i].setTaskID(jobId + "_"+i);

        int count = 0;
        Result reponseTemplate = new Result();
        Result reponses[] = new Result [tasks];
        while (true)
            Result reponse = space.take(reponseTemplate,1000);
            if (reponse!=null)
                reponses[count]= reponse ;
                count ++;
            if (count == tasks)
                System.out.println(new Date(System.currentTimeMillis())+ " - Done executing Job " +jobId);
@EventDriven @Polling (concurrentConsumers=2)
public class Worker {

    public Worker ()
        System.out.println("Worker started!");

    Request template() {
        Request template = new Request();
        return template;

    ReceiveOperationHandler receiveHandler() {
        SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
        return receiveHandler;

    public Result execute(Request request) {
        //process Data here
        try {Thread.sleep(1000);} catch (InterruptedException e) {
        Result reponse = new Result ();
        return reponse;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd">

    <!-- Enable scan for OpenSpaces and Spring components -->
    <context:component-scan base-package="org.openspaces.example.masterworker.nonblocking"/>
    <!-- Enable support for @Polling annotation -->
    <os-events:annotation-support />
    <os-core:space-proxy id="space" name="mySpace" />
    <os-core:giga-space id="gigaSpace" space="space"/>
public class Base {

    public Base (){}

    Integer jobID;
    String taskID;
    Object payload;

    public Integer getJobID() {
        return jobID;
    public void setJobID(Integer jobID) {
        this.jobID = jobID;

    public String getTaskID() {
        return taskID;
    public void setTaskID(String taskID) {
        this.taskID = taskID;
    public Object getPayload() {
        return payload;
    public void setPayload(Object payload) {
        this.payload = payload;
public class Request extends Base{
    public Request (){}

public class Result extends Base {
    public Result (){}


 Example 2 - Designated Workers

With this approach, each new worker is assigned a specific ID and consumes Request objects from a designated partition. In this case, the worker runs in blocking mode. The Request object routing field is populated with the partition ID, with the Polling Container template, and is also populated by the Master application before it is written into the partitioned clustered space.

Step 1:


Step 2:


With this approach the number of Workers should be greater than or equal to the number of partitions.

See below how the Designated Workers approach should be implemented:

public class Master {

    static GigaSpace space;
    static int partitions;

    public static void main(String[] args) {
        space = new GigaSpaceConfigurer(new UrlSpaceConfigurer("jini://*/*/mySpace")).gigaSpace();
        String total_members = space.getSpace().getURL().getProperty(SpaceURL.CLUSTER_TOTAL_MEMBERS);
        if (total_members != null)
            partitions =
            partitions =1;

        for (int i=0;i<10;i++)
            submitJob(i, 100);

    static public void submitJob(int jobId , int tasks)
        System.out.println(new Date(System.currentTimeMillis())+ " - Executing Job " +jobId);
        Request requests [] = new Request [tasks];
        AtomicInteger index = new AtomicInteger(0);
        for (int i=0;i<tasks; i++)
            requests [i] = new  Request ();
            requests [i].setJobID(jobId);
            requests [i].setTaskID(jobId + "_"+i);
            requests [i].setRouting(index.getAndIncrement() %partitions );

        int count = 0;
        Result reponseTemplate = new Result();
        while (true)
            Result _reponses[] = space.takeMultiple(reponseTemplate,Integer.MAX_VALUE);
            if (_reponses.length > 0)
                count = count +_reponses.length;
            if (count == tasks)
                System.out.println(new Date(System.currentTimeMillis())+
                    " - Done executing Job " +jobId);
            try {
            } catch (InterruptedException e) {
@EventDriven @Polling (concurrentConsumers=1)
public class Worker implements ClusterInfoAware{

    public void setClusterInfo(ClusterInfo clusterInfo) {
        System.out.println("--------------- > setClusterInfo called");
        if (clusterInfo != null)
            this.clusterInfo = clusterInfo;
            System.out.println("--------------- > Worker " +
                clusterInfo.getInstanceId() + " started");

            String total_members = gigaspace.getSpace().getURL().getProperty(SpaceURL.CLUSTER_TOTAL_MEMBERS);

            int partitions ;
            if (total_members != null)
                partitions =
                    Integer.valueOf(total_members .substring(0,total_members.indexOf(","))).intValue();
                partitions =1;

            System.out.println("--------------- > "+ gigaspace.getSpace().getName() +
                " SpaceClosed Where GigaSpaces data is stored. It is the logical cache that holds data objects in memory and might also hold them in layered in tiering. Data is hosted from multiple SoRs, consolidated as a unified data model. got " + partitions + " partitions ");
            routingValue = (clusterInfo.getInstanceId() - 1) % partitions ;

            System.out.println("--------------- > Worker "+  clusterInfo.getInstanceId() +
                " attached to Partition:"+ routingValue );
            System.out.println("--------------- > Worker started in non clustered mode");
            routingValue = 0;

    private ClusterInfo clusterInfo;

    public Worker (){}

    private Integer routingValue;

    GigaSpace gigaspace;

    public void afterPropertiesSet() throws Exception {


    Request template() {
        Request template = new Request();
        return template;

    ReceiveOperationHandler receiveHandler() {
        SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
        return receiveHandler;

    public Result execute(Request request) {
        //process Data here
        try {Thread.sleep(1000);} catch (InterruptedException e) {
            // TODO Auto-generated catch block
        Result reponse = new Result ();
        return reponse;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd">

    <!-- Enable scan for OpenSpaces and Spring components -->
    <context:component-scan base-package="org.openspaces.example.masterworker.blocking"/>
    <!-- Enable support for @Polling annotation -->
    <os-events:annotation-support />
    <os-core:space-proxy id="space" name="mySpace" />
    <os-core:giga-space id="gigaSpace" space="space"/>
public class Base {

    public Base (){}

    Integer jobID;
    String taskID;
    Object payload;
    Integer routing;

    public Integer getJobID() {
        return jobID;
    public void setJobID(Integer jobID) {
        this.jobID = jobID;

    public String getTaskID() {
        return taskID;
    public void setTaskID(String taskID) {
        this.taskID = taskID;
    public Object getPayload() {
        return payload;
    public void setPayload(Object payload) {
        this.payload = payload;

    public Integer getRouting() {
        return routing;
    public void setRouting(Integer routing) {
        this.routing = routing;
public class Request extends Base{
    public Request (){}

public class Result extends Base {
    public Result (){}


Deploying the clustered space PU:

>gs deploy-space -cluster schema=partitioned total_members=2 mySpace

Deploying the Workers PU:

>gs deploy -cluster total_members=4 MasterWorker.jar

 .NET Implementation

Here is an example of the Master Worker Pattern for .NET.
