When building low latency systems, a critical requirement of such systems is to be able to process the incoming data as fast as they can, but also to do that in the exact order the request has been submitted into the system. The system can't process requests associated with the same Order in parallel, but in the exact order these has been created at the client side.
Such a requirement would be relevant with algorithmic trading engines , Order Management Systems , Market Data processing system , High Speed inventory systems, etc.
Having multiple polling containers running collocated with each partition allows us to scale at the partition level, forming set of "virtual queues", that consume data pushed into the partition in a parallel manner, but also in the correct order. The amount of polling containers will be usually the number of machine cores. This will optimize the ability to use the machine CPUs in the most efficient manner.
With our example we will simulate a simple Order Management processing system where Orders are sent into the system. An order might have 5 states (state 1-5) where these should be processed in the correct order by a "processor". Different Orders (from different clients) should be processed in parallel, but requests associated with the same order MUST be processed in the exact order they have submitted by the end point client.
Here is an example for the latency duration for the Order request processing time:
The above results retrieved when running the Data-Grid with 4 partitions with a backup.
The Order Management Processor Example
The following example illustrates a simple Order management processor that includes the following artifacts:
The Order Class - This represents an Order request. An order includes a Symbol , requestType, id , orderId and a bucketId field.
– The orderId field used to partition the Order requests (its getter method annotated with @SpaceRouting) between the partitions.
– The id used as the space object ID (its getter annotated with @SpaceID).
– The bucketId used to generate the buckets that each polling container consuming Orders from.
– The Order class is decorated with the FIFO mode allows it to be consumed in a FIFO manner.
The Feeder - A client running multiple threads pushing requests for different Order.
The Processor - Polling Container listener. Running collocated with the data-grid. Each polling container consumes different set of objects from its co located space. Each Polling Container template using different bucketId.
The Data-Grid - Stores the incoming Order requests state until these are consumed by the polling containers. It provides the ability to scale the system and also provides high-availability.
The ProcessorFactory - A Spring bean. Creates the polling containers and their listeners. The polling containers consumes data based on the Order Symbol. Each associated with a different Symbol. These are running collocated with the Data-Grid.
The bucketId is calculated using the following formula:
(first char of Symbol hashcode)%(# of machine cores)
packagecom.gigaspaces.examples.parallelqueue;importjava.util.concurrent.atomic.AtomicInteger;importorg.openspaces.core.GigaSpace;importorg.openspaces.core.GigaSpaceConfigurer;importorg.openspaces.core.space.UrlSpaceConfigurer;importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;publicclassFeederimplementsRunnable{staticGigaSpace space =null;staticfinalintORDERS_COUNT=20;staticAtomicInteger counter =newAtomicInteger(0);publicstaticvoidmain(String[] args){
space =newGigaSpaceConfigurer(newUrlSpaceConfigurer("jini://*/*/space")).gigaSpace();int threadCount =5;ThreadPoolTaskExecutor tpExecutor =newThreadPoolTaskExecutor();
tpExecutor.setCorePoolSize(threadCount);
tpExecutor.setMaxPoolSize(threadCount);
tpExecutor.setQueueCapacity(threadCount);
tpExecutor.afterPropertiesSet();for(int i=0;i<threadCount;i++){
tpExecutor.execute(newFeeder());}
tpExecutor.destroy();}publicvoidrun(){System.out.println(Thread.currentThread()+" started");for(int i=0;i<ORDERS_COUNT;i++){for(int j=0;j<Order.requestTypes.length;j++){try{Thread.sleep(10);}catch(InterruptedException e){}Order o =newOrder();
o.setOrderId(Thread.currentThread().getId()+"_"+ i);// If we use the Order ID both for the @SpaceID and// @SpaceRouting fields we might end up updating the same object
o.setId(counter.incrementAndGet());
o.setSymbol(Order.symbols[i%Order.symbols.length]);
o.setSendTime(System.currentTimeMillis());String requestType =Order.requestTypes[j];
o.setRequestType(requestType);
space.write(o);System.out.println(Thread.currentThread()+" send Order "+ o);}}}}
packagecom.gigaspaces.examples.parallelqueue;importorg.openspaces.core.GigaSpace;importorg.openspaces.events.SpaceDataEventListener;importorg.springframework.transaction.TransactionStatus;publicclassProcessorimplementsSpaceDataEventListener<Order>{Processor(int bucket,int partitionID ){this.bucket= bucket;this.partitionID=partitionID;}int bucket;int partitionID ;publicvoidonEvent(Order order,GigaSpace space,TransactionStatus tx,Object arg3){long time =System.currentTimeMillis();long latency = time - order.getSendTime();System.out.println("Time "+time +" partitionID "+ partitionID +" Bucket"+ bucket +" Got order: "+ order +" Processing Time "+latency +" ms");}}