Rapid Data Load
Author | XAP Version | Last Updated | Reference | Download |
---|---|---|---|---|
Shay Hassidim | 7.0 | July 2009 |
Overview
In some cases you might need to load data into the data grid in a very fast manner. This is mostly needed as part of your development phase or unit tests. You might not have a fast database on hand to load data, from using the HibernateExternalDataSource implementation, or it might be easier for you to create a data generator utility that simulates the real life data your application needs. A simple technique to load data very rapidly into the data grid, is to use a DistributedTask implementation that generates the data and writes it into the collocated space. The generated data is constructed in such a way that its routing field value “matches” the collocated space.
The Distributed Task is executed for each Space class you have, allowing you to load data in a parallel manner across all partitions (this is in fact 2 dimensional parallel data load).
Example
Here is a simple example: we have 3 types of Space Classes:
StockMktHist
, LastPrice
, StockHist
- all of these use the Currency
field as their routing field. The Currency routing field is a string-based field, where its values could be USD,EUR,GBP, etc.
The createCurrencyGroups
A Data generator utility class has a createCurrencyGroups()
method that generates a Hash Map that groups currencies that belong to the same partition using the Currency String hashcode - here is a simple implementation of such a method:
static String currencies[] = { "AFN","ALL","AMD","ANG","AOA","ARS","AUD","AWG","AZN","BAM","BBD",
"BDT","BGN","BHD","BIF","BMD","BND","BOB","BRL","BSD","BTN",
"BWP","BYR","BZD","CAD","CVE","DZD","EUR","GBP","INR","KHR",
"KYD","MMK","NOK","USD","XAF","XCD","XOF" };
public static void createCurrencyGroups(int maxPartitions) {
if (currencyGroups != null )
return;
currencyGroups = new ConcurrentHashMap<Integer,List <String> >();
for (String currency: currencies) {
int group = currency.hashCode() % maxPartitions;
if (!currencyGroups.containsKey(group))
{
currencyGroups.put(group , new ArrayList<String>());
}
currencyGroups.get(group).add(currency);
}
}
With the above implementation, we generate several lists of currencies, all of these are maintained within currencyGroups
- one for each partition.
The getRandomCurrency
The Data generator also has the getRandomCurrency
method that returns a random currency, based on a given partition - it uses the currencyGroups we created above:
public static String getRandomCurrency(int partition) {
// for a Single space
if (partition ==0)
return currency[random.nextInt(currency.length)];
List<String> list = currencyGroups.get(partition-1);
return list.get (random.nextInt(list.size()));
}
The getRandomCurrency is used with our data generator utility.
The LoaderRequest
The LoaderRequest execute method implementation generates an array of the requested type and writes it using one method call (writeMultiple) into its collocated space:
public class LoaderRequest implements DistributedTask<String, String>{
public static final int RequestTypeLastPrice =1;
public static final int RequestTypeStockHist=2;
public static final int RequestTypeStockMktHist=3;
@TaskGigaSpace
transient GigaSpace gigaspace;
transient static int batchSize = 5000;
transient int partition ;
transient int maxPartitions ;
int amount;
int requestType;
public LoaderRequest (int amount , int requestType)
{
this.amount=amount;
this.requestType=requestType;
}
@Override
public String execute() throws Exception {
System.out.println(System.currentTimeMillis() + " " +
Thread.currentThread().getName() + "-------> Loading Data into Partition:"
+gigaspace.getSpace().getURL().getMemberName() + " - requestType:"+ requestType);
// for a single space
if (gigaspace.getSpace().getURL().getProperty(SpaceURL.CLUSTER_SCHEMA) == null)
{
partition =1;
maxPartitions =1;
}
else
{
partition = Integer.valueOf(gigaspace.getSpace().getURL().getProperty(SpaceURL.CLUSTER_MEMBER_ID)).intValue();
maxPartitions = Integer.valueOf(gigaspace.getSpace().getURL().getProperty(SpaceURL.CLUSTER_TOTAL_MEMBERS)).intValue();
}
DataGenerator.createCurrencyGroups(maxPartitions);
LastPrice lastPriceBatch[] = null;
StockHist stockHistBatch[] = null ;
StockMktHist stockMktHistBatch[]= null;
switch (requestType) {
case RequestTypeLastPrice :
{
lastPriceBatch = new LastPrice [batchSize ];
break;
}
case RequestTypeStockHist:
{
stockHistBatch = new StockHist [batchSize ];
break;
}
case RequestTypeStockMktHist :
{
stockMktHistBatch = new StockMktHist [batchSize ];
break;
}
default:
{
System.out.println(" ========= Non supported type ========= ");
break;
}
}
int cycles= amount/batchSize;
for (int i=0;i<cycles;i++)
{
switch (requestType) {
case RequestTypeLastPrice :
{
for (int j=0;j<batchSize;j++)
{
lastPriceBatch[j]=DataGenerator.createLastPrice(partition);
}
gigaspace.writeMultiple(lastPriceBatch);
break;
}
case RequestTypeStockHist:
{
for (int j=0;j<batchSize;j++)
{
stockHistBatch[j]=DataGenerator.createStockHist(partition);
}
gigaspace.writeMultiple(stockHistBatch);
break;
}
case RequestTypeStockMktHist :
{
for (int j=0;j<batchSize;j++)
{
stockMktHistBatch[j] = DataGenerator.createStockMktHist(partition);
}
gigaspace.writeMultiple(stockMktHistBatch);
break;
}
}
}
return "OK";
}
@Override
public String reduce(List<AsyncResult<String>> result) throws Exception {
return "OK";
}
int routing;
@SpaceRouting
public Integer routing() {
return routing;
}
}
The Client Application
The client application creates a LoaderRequest
object and executes it, one for each space Class, where in reality all these LoaderRequest
objects are sent in parallel to all running partitions to be executed. This is how you have these 3 types of objects loaded into all partitions simultaneously:
GigaSpace gigaSpace = new GigaSpaceConfigurer(new UrlSpaceConfigurer("jini://*/*/space").space()).gigaSpace();
AsyncFuture<String> future1 = gigaSpace.execute(new LoaderRequest(objectsToLoad,LoaderRequest.RequestTypeLastPrice));
AsyncFuture<String> future2 = gigaSpace.execute(new LoaderRequest(objectsToLoad,LoaderRequest.RequestTypeStockHist));
AsyncFuture<String> future3 = gigaSpace.execute(new LoaderRequest(objectsToLoad,LoaderRequest.RequestTypeStockMktHist));
String result1 = future1.get();
String result2 = future2.get();
String result3 = future3.get();