Master-Worker Pattern


Author XAP Version Last Updated Reference Download
Shay Hassidim 7.0 August 2009

Overview

The Master-Worker Pattern (sometimes called the Master-Slave or the Map-Reduce pattern) is used for parallel processing. It follows a simple approach that allows applications to perform simultaneous processing across multiple machines or processes via a Master and multiple Workers.

the_master_worker.jpg

In GigaSpaces XAP, you can implement the Master-Worker pattern using several methods:

  • Task Executors - best for a scenario where the processing activity is collocated with the data (the data is stored within the same space as the tasks being executed).
  • Polling Containers - in this case the processing activity runs in a separate machine/VM from the space. This approach should be used when the processing activity consumes a relatively large amount of CPU and takes a large amount of time. It might also be relevant if the actual data required for the processing is not stored within the space, or the time it takes to retrieve the required data from the space is much shorter than the time it takes to complete the processing.

Implementing Master-Worker in XAP using Polling Containers

The Polling Containers approach uses the classic JavaSpaces write/take operations to implement the parallel processing. This allows a Master client application to generate a Job that is a set of Request objects, write these into the space and immediately perform a Take operation on the Result objects.

The Request object has an execute method that includes the actual processing business logic. The Workers, implemented via a polling container, perform a continuous Take operation on the Request objects. Once a Request object has been consumed, its execute method is called and a Result object is written back into the space. The Master application consumes these incoming objects. Once the amount of Result objects consumed by the Master for the relevant Job matches the amount of Request objects, the Job execution has been completed.

When there is one space (with or without a backup) used by the Master and Workers, you can run the workers in blocking mode (take operation with timeout > 0). This means that once a matching Request is written into the space, one of the running workers immediately consumes it.

When running multiple Workers, processing is load-balanced across all the workers in an even manner. When there is a large amount of activity, you might need to run a partitioned space to allow the space layer to store a large number of Request objects (there will always be a small number of Result objects in the space), and to cope with a large number of Workers. This makes sure that your system can scale, and the space layer does not act as a bottleneck.

When running the space in clustered partitioned mode, you cannot run the workers in blocking mode without assigning a value to the Request object routing field. The Designated Workers approach allows you to run the workers against a partitioned space, in blocking mode.

The following sections include code samples and configuration that illustrate the Master-Worker implementation via Polling Containers, using the Random Workers and Designated Workers approach.

Tip

We invite you to download the code examples and configuration files used with this article.

Example 1 - Random Workers

With the Random Workers approach, each worker can consume Request objects from every space partition. In this case, the non-blocking mode is used. The workers scan the partitions in a round-robin fashion for a Request object to consume and execute. With this approach, there might be a small delay until the workers consume a Request object. This approach might generate some chatting over the network, since the workers connect to all existing partitions to look for Request objects to consume and in case none is found, wait for some time and then try again.

Step 1:

master_worker_rr1.jpg

Step 2:

master_worker_rr2.jpg

public class Master {

    static GigaSpace space;

    public static void main(String[] args) {
        space= new GigaSpaceConfigurer(new UrlSpaceConfigurer("jini://**/**/mySpace")).gigaSpace();

        for (int i=0;i<10;i++)
        {
            submitJob(i, 100);
        }
        System.exit(0);
    }

    static public void submitJob(int jobId , int tasks)
    {
        System.out.println(new Date(System.currentTimeMillis())+
            " - Executing Job " +jobId);
        Request requests [] = new Request [tasks];
        for (int i=0;i<tasks; i++)
        {
            requests [i] = new  Request ();
            requests [i].setJobID(jobId);
            requests [i].setTaskID(jobId + "_"+i);
        }

        space.writeMultiple(requests);
        int count = 0;
        Result reponseTemplate = new Result();
        reponseTemplate.setJobID(jobId);
        Result reponses[] = new Result [tasks];
        while (true)
        {
            Result reponse = space.take(reponseTemplate,1000);
            if (reponse!=null)
            {
                reponses[count]= reponse ;
                count ++;
            }
            if (count == tasks)
            {
                System.out.println(new Date(System.currentTimeMillis())+ " - Done executing Job " +jobId);
                break;
            }
        }
    }
}
@EventDriven @Polling (concurrentConsumers=2)
public class Worker {

    public Worker ()
    {
        System.out.println("Worker started!");
    }

    @EventTemplate
    Request template() {
        Request template = new Request();
        return template;
    }

    @ReceiveHandler
    ReceiveOperationHandler receiveHandler() {
        SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
        receiveHandler.setNonBlocking(true);
        receiveHandler.setNonBlockingFactor(10);
        return receiveHandler;
    }

    @SpaceDataEvent
    public Result execute(Request request) {
        //process Data here
        try {Thread.sleep(1000);} catch (InterruptedException e) {
            e.printStackTrace();
        }
        Result reponse = new Result ();
        reponse.setJobID(request.getJobID());
        reponse.setTaskID(request.getTaskID());
        return reponse;
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd">

    <!-- Enable scan for OpenSpaces and Spring components -->
    <context:component-scan base-package="org.openspaces.example.masterworker.nonblocking"/>
    <!-- Enable support for @Polling annotation -->
    <os-events:annotation-support />
    <os-core:space-proxy id="space" name="mySpace" />
    <os-core:giga-space id="gigaSpace" space="space"/>
    <os-core:giga-space-context/>
    <os-remoting:annotation-support/>
</beans>
@SpaceClass
public class Base {

    public Base (){}

    Integer jobID;
    String taskID;
    Object payload;

    @SpaceProperty(index=IndexType.BASIC)
    @SpaceRouting
    public Integer getJobID() {
        return jobID;
    }
    public void setJobID(Integer jobID) {
        this.jobID = jobID;
    }

    @SpaceId(autoGenerate=false)
    public String getTaskID() {
        return taskID;
    }
    public void setTaskID(String taskID) {
        this.taskID = taskID;
    }
    public Object getPayload() {
        return payload;
    }
    public void setPayload(Object payload) {
        this.payload = payload;
    }
}
@SpaceClass
public class Request extends Base{
    public Request (){}

}
@SpaceClass
public class Result extends Base {
    public Result (){}

}

Deploying the clustered space PU:

>gs deploy-space -cluster schema=partitioned total_members=2 mySpace

Deploying the Workers PU:

>gs deploy -cluster total_members=4 MasterWorker.jar

Example 2 - Designated Workers

With this approach, each new worker is assigned a specific ID and consumes Request objects from a designated partition. In this case, the worker runs in blocking mode. The Request object routing field is populated with the partition ID, with the Polling Container template, and is also populated by the Master application before it is written into the partitioned clustered space.

Step 1:

master_worker_de1.jpg

Step 2:

master_worker_de2.jpg

Tip

With this approach the number of Workers should be greater than or equal to the number of partitions.

See below how the Designated Workers approach should be implemented:

public class Master {

    static GigaSpace space;
    static int partitions;

    public static void main(String[] args) {
        space = new GigaSpaceConfigurer(new UrlSpaceConfigurer("jini://*/*/mySpace")).gigaSpace();
        String total_members = space.getSpace().getURL().getProperty(SpaceURL.CLUSTER_TOTAL_MEMBERS);
        if (total_members != null)
            partitions =
                Integer.valueOf(total_members.substring(0,total_members.indexOf(","))).intValue();
        else
            partitions =1;

        for (int i=0;i<10;i++)
        {
            submitJob(i, 100);
        }
        System.exit(0);
    }

    static public void submitJob(int jobId , int tasks)
    {
        System.out.println(new Date(System.currentTimeMillis())+ " - Executing Job " +jobId);
        Request requests [] = new Request [tasks];
        AtomicInteger index = new AtomicInteger(0);
        for (int i=0;i<tasks; i++)
        {
            requests [i] = new  Request ();
            requests [i].setJobID(jobId);
            requests [i].setTaskID(jobId + "_"+i);
            requests [i].setRouting(index.getAndIncrement() %partitions );
        }

        space.writeMultiple(requests);
        int count = 0;
        Result reponseTemplate = new Result();
        reponseTemplate.setJobID(jobId);
        while (true)
        {
            Result _reponses[] = space.takeMultiple(reponseTemplate,Integer.MAX_VALUE);
            if (_reponses.length > 0)
            {
                count = count +_reponses.length;
            }
            if (count == tasks)
            {
                System.out.println(new Date(System.currentTimeMillis())+
                    " - Done executing Job " +jobId);
                break;
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
@EventDriven @Polling (concurrentConsumers=1)
public class Worker implements ClusterInfoAware{

    public void setClusterInfo(ClusterInfo clusterInfo) {
        System.out.println("--------------- > setClusterInfo called");
        if (clusterInfo != null)
        {
            this.clusterInfo = clusterInfo;
            System.out.println("--------------- > Worker " +
                clusterInfo.getInstanceId() + " started");

            String total_members = gigaspace.getSpace().getURL().getProperty(SpaceURL.CLUSTER_TOTAL_MEMBERS);

            int partitions ;
            if (total_members != null)
                partitions =
                    Integer.valueOf(total_members .substring(0,total_members.indexOf(","))).intValue();
            else
                partitions =1;

            System.out.println("--------------- > "+ gigaspace.getSpace().getName() +
                " Space got " + partitions + " partitions ");
            routingValue = (clusterInfo.getInstanceId() - 1) % partitions ;

            System.out.println("--------------- > Worker "+  clusterInfo.getInstanceId() +
                " attached to Partition:"+ routingValue );
        }
        else
        {
            System.out.println("--------------- > Worker started in non clustered mode");
            routingValue = 0;
        }
    }

    private ClusterInfo clusterInfo;

    public Worker (){}

    private Integer routingValue;

    @GigaSpaceContext
    GigaSpace gigaspace;

    public void afterPropertiesSet() throws Exception {

    }

    @EventTemplate
    Request template() {
        Request template = new Request();
        template.setRouting(routingValue);
        return template;
    }

    @ReceiveHandler
    ReceiveOperationHandler receiveHandler() {
        SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
        return receiveHandler;
    }

    @SpaceDataEvent
    public Result execute(Request request) {
        //process Data here
        try {Thread.sleep(1000);} catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Result reponse = new Result ();
        reponse.setJobID(request.getJobID());
        reponse.setTaskID(request.getTaskID());
        reponse.setRouting(request.getRouting());
        return reponse;
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd">

    <!-- Enable scan for OpenSpaces and Spring components -->
    <context:component-scan base-package="org.openspaces.example.masterworker.blocking"/>
    <!-- Enable support for @Polling annotation -->
    <os-events:annotation-support />
    <os-core:space-proxy id="space" name="mySpace" />
    <os-core:giga-space id="gigaSpace" space="space"/>
    <os-core:giga-space-context/>
    <os-remoting:annotation-support/>
</beans>
@SpaceClass
public class Base {

    public Base (){}

    Integer jobID;
    String taskID;
    Object payload;
    Integer routing;

    @SpaceProperty(index=IndexType.BASIC)
    public Integer getJobID() {
        return jobID;
    }
    public void setJobID(Integer jobID) {
        this.jobID = jobID;
    }

    @SpaceId(autoGenerate=false)
    public String getTaskID() {
        return taskID;
    }
    public void setTaskID(String taskID) {
        this.taskID = taskID;
    }
    public Object getPayload() {
        return payload;
    }
    public void setPayload(Object payload) {
        this.payload = payload;
    }

    @SpaceRouting
    public Integer getRouting() {
        return routing;
    }
    public void setRouting(Integer routing) {
        this.routing = routing;
    }
}
@SpaceClass
public class Request extends Base{
    public Request (){}

}
@SpaceClass
public class Result extends Base {
    public Result (){}

}

Deploying the clustered space PU:

>gs deploy-space -cluster schema=partitioned total_members=2 mySpace

Deploying the Workers PU:

>gs deploy -cluster total_members=4 MasterWorker.jar

.NET Implementation

Here is an example of the Master Worker Pattern for .NET.

References