Aggregators


With many systems such as pricing systems, risk management, trading and other analytic and business intelligence applications you may need to perform an aggregation activity across data stored within the data grid when generating reports or when running some business process. Such activity can leverage data stored in memory and will be much faster than performing it with a database. XAP provides common functionality to perform aggregations across the space. There is no need to retrieve the entire data set from the space to the client side , iterate the result set and perform the aggregation. This would be an expensive activity as it might return large amount of data into the client application.

Built-in Aggregators allow you to perform the entire aggregation activity at the space side avoiding any data retrieval back to the client side. Only the result of each aggregation activity performed with each partition is returned back to the client side where all the results are reduced and returned to the client application. Such aggregation activity utilize the partitioned nature of the data-grid allowing each partition to execute the aggregation with its local data in parallel, where all the partitions intermediate results are fully aggregated at the client side using the relevant reducer implementation.

How Aggregators Works?

Aggregators are executed by iterating the internal data grid structure that maintains the space objects. There is no materialization of the original user data grid object when performing this iteration (scan). This allows relatively fast scan. There is no need to index the aggregated fields (paths) - only the fields (paths) used to execute the query used to generate the result set scanned to calculate the aggregation. Future XAP releases may use indexes to perform the aggregation.

Supported Aggregators


XAP comes with several built-in Aggregators you may use. The aggregation process executed across all data grid partitions when using a partitioned data grid , or across the proxy master replica when using a replicated data grid. You may rout the aggregation into a specific partition. You may implement also a custom Aggregator that will perform a special aggregation logic on a given field (path) and a given entries set based on a query. Aggregators are specified via the AggregationSet that may have one or more Aggregators listed.


Name Description
min Returns the minimum value for a set of data grid entries for a given field (path) based on a given query.
max Returns the maximum value for a set of data grid entries for a given field (path) based on a given query.
count Returns the count (matching entries) value for a set of data grid entries based on a given query.
average Returns the average value for a given set of data grid entries for a given field (path) based on a given query.
sum Returns the sum value for a set of data grid entries for a given field (path) based on a given query.
groupby Returns a key/value map of generated result set for multiple aggregations based on a given query and a given field(s)/paths(s). Perform similar aggregation as the SQL GROUP BY Statement
having Used to perform additional filtering on the aggregation result set. Perform similar aggregation as the SQL HAVING Clause
distinct Returns a distinct result based on a given query.
maxEntry Returns the Entry (space object) with the maximum value for a set of data grid entries for a given field (path) based on a given query.
minEntry Returns the Entry (space object) with the minimum value for a set of data grid entries for a given field (path) based on a given query.
custom An extension to the SpaceEntriesAggregator. Return the aggregation for a user defined logic on a given field (path) and a given entries set based on a query.

Interoperability

Aggregators may be performed on any data generated by any type of client. For example - A call for Aggregation from a Java application may be performed on space objects that were written into the space using .NET application using the XAP.NET API. Same for a call from .NET Aggregation API for data written into the space via a Java application.

Usage

Here are some aggregation examples using the QueryExtension

import static org.openspaces.extensions.QueryExtension.*;
...
SQLQuery<Person> query = new SQLQuery<Person>(Person.class,"country=? OR country=? ");
query.setParameter(1, "UK");
query.setParameter(2, "U.S.A");

// retrieve the maximum value stored in the field "age"
Long maxAgeInSpace = max(space, query, "age");
/// retrieve the minimum value stored in the field "age"
Long minAgeInSpace = min(space, query, "age");
// Sum the "age" field on all space objects.
Long combinedAgeInSpace = sum(space, query, "age");
// Sum's the "age" field on all space objects then divides by the number of space objects.
Double averageAge = average(space, query, "age");
// Retrieve the space object with the highest value for the field "age".
Person oldestPersonInSpace = maxEntry(space, query, "age");
/// Retrieve the space object with the lowest value for the field "age".
Person youngestPersonInSpace = minEntry(space, query, "age");
@SpaceClass
public class Person {
    private Long id;
    private Long age;
    private String country;

    @SpaceId(autoGenerate=false)
    public Long getId() {
        return id;
    }

    public Person setId(Long id) {
        this.id = id;
        return this;
    }

    public Long getAge() {
        return age;
    }

    public Person setAge(Long age) {
        this.age = age;
        return this;
    }

    @SpaceIndex
    public String getCountry() {
        return country;
    }

    public Person setCountry(String country) {
        this.country = country;
        return this;
    }
}


Compound Aggregation


Compound aggregation will execute multiple aggregation operations across the space returning all of the result sets at once. When multiple aggregates are needed the compound aggregation API is significantly faster than calling each individual aggregate.


import static org.openspaces.extensions.QueryExtension.*;
...
SQLQuery<Person> query = new SQLQuery<Person>(Person.class,"country=? OR country=? ");
query.setParameter(1, "UK");
query.setParameter(2, "U.S.A");

AggregationResult aggregationResult = space.aggregate(query,
		new AggregationSet().maxEntry("age").minEntry("age").sum("age")
			.average("age").minValue("age").maxValue("age"));

//retrieve result by index
Person oldest = (Person) aggregationResult.get(0);
Person youngest = (Person) aggregationResult.get(1);
Long sum = (Long) aggregationResult.get(2);
Double average = (Double) aggregationResult.get(3);

//retrieve result by string key
Long min = (Long) aggregationResult.get("minValue(age)");
Long max = (Long) aggregationResult.get("maxValue(age)");

Nested Fields Aggregation

Aggregation against the members of embedded space classes (nested field) is supported by supplying the field path while invoking the desired aggregate function.

import static org.openspaces.extensions.QueryExtension.*;
...
SQLQuery<Person> query = new SQLQuery<Person>(Person.class,"country=? OR country=? ");
query.setParameter(1, "UK");
query.setParameter(2, "U.S.A");

// retrieve the maximum value stored in the field "age"
Integer maxAgeInSpace = max(space, personSQLQuery, "demographics.age");
@SpaceClass
public class Person {
    private String id;
    private String name;
    private String state;
    private Demographics demographics;

    @SpaceId(autoGenerate = true)
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public Demographics getDemographics() {
        return demographics;
    }

    public void setDemographics(Demographics demographics) {
        this.demographics = demographics;
    }
}
public class Demographics     {
    private Integer age;
    private char gender;

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public char getGender() {
        return gender;
    }

    public void setGender(char gender) {
        this.gender = gender;
    }
}

Group Aggregation

The GroupByAggregator is used in conjunction with the aggregate functions to group the result-set by one or more columns. Here is an example:

import static org.openspaces.extensions.QueryExtension.*;
import com.gigaspaces.query.aggregators.GroupByAggregator;
import com.gigaspaces.query.aggregators.GroupByFilter;
import com.gigaspaces.query.aggregators.GroupByResult;
import com.gigaspaces.query.aggregators.GroupByValue;

// select AVG(salary), MIN(salary), MAX(salary) from Employees WHERE age > 50 group by Department, Gender
SQLQuery<Employee> query = new SQLQuery<Employee>(Employee.class, "age > ?",50);
GroupByResult groupByResult = groupBy(gigaSpace, query, new GroupByAggregator()
				.select(average("salary"), min("salary"), max("salary"))
				.groupBy("department", "gender"));

for (GroupByValue group : groupByResult) {
    // Getting info from the keys:
    Department department = (Department) group.getKey().get("department");
	Gender gender = (Gender) group.getKey().get("gender");
	// Getting info from the value:
	double avgSalary = group.getDouble("avg(salary)");
	long maxSalary = group.getLong("max(salary)");
	long minSalary = group.getLong("min(salary)");
}

You can also use the GroupByFilter to restrict the groups of selected objects to only those whose condition is TRUE similar to the SQL HAVING Clause.

// Select AVG(Salary) , Count(*) from Employees Where companyId = 10 group by Department Having AVG(Salary) > 18,000
SQLQuery<Employee> query = new SQLQuery<Employee>(Employee.class,"companyId = 10");

GroupByResult groupByResult = groupBy(gigaSpace, query, new GroupByAggregator()
		.select(average("salary"), count()).groupBy("department")
		.having(new GroupByFilter() {
			@Override
			public boolean process(GroupByValue group) {
				return group.getDouble("avg(salary)") > 18000;
			}
        }));

for (GroupByValue group : groupByResult) {
    // Getting info from the keys:
    Department department = (Department) group.getKey().get("department");
    // Getting info from the value:
	double avgSalary = group.getDouble("avg(salary)");
	long count = group.getLong("count(*)");
}

Distinct Aggregation

The DistinctAggregator is used in conjunction with the aggregate functions to perform a distinct select by one or more columns. Here is an example:

import static org.openspaces.extensions.QueryExtension.distinct;

public void selectDistinct()
{
    SQLQuery<Person> query = new SQLQuery<Person>(Person.class, "");
	query.setProjections("lastName","gender");

	// QueryExtension.
	DistinctAggregator<Person> aggregator = new DistinctAggregator<Person>()
				.distinct("lastName", "gender");

	List<Person> persons = distinct(sandboxSpace, query, aggregator);
}

Routing

When running on a partitioned space, it is important to understand how routing is determined for SQL queries. If the routing property is part of the criteria expression with an equality operand and without ORs, its value is used for routing. In some scenarios we may want to execute the query on a specific partition without matching the routing property (e.g. blocking operation). This can be done via the setRouting method:

// Select AVG(Salary) , Count(*) from Employees Where companyId = 10 group by Department Having AVG(Salary) > 18,000
SQLQuery<Employee> query = new SQLQuery<Employee>(Employee.class,"companyId = 10");
query.setRouting(1);

GroupByResult groupByResult = groupBy(gigaSpace, query, new GroupByAggregator()
		.select(average("salary"), count()).groupBy("department")
		.having(new GroupByFilter() {
			@Override
			public boolean process(GroupByValue group) {
				return group.getDouble("avg(salary)") > 18000;
			}
        }));

for (GroupByValue group : groupByResult) {
    // Getting info from the keys:
    Department department = (Department) group.getKey().get("department");
    // Getting info from the value:
	double avgSalary = group.getDouble("avg(salary)");
	long count = group.getLong("count(*)");
}

See also:

SQL Query Routing

Custom Aggregation

You may extend the SpaceEntriesAggregator to execute user defined aggregation logic on a given field (path) and a given entries set based on a query.

The example below shows aString field concatenation aggregator - for each entry extracts the field (path) value and concatenates with the previous values extracted.

The aggregate method is called within each partition. Here we keep the ConcatAggregator object (and its transient StringBuilder sb) alive for the duration of the scan so it can be reused to concatenate the values.

The aggregateIntermediateResult method is called at the client side (only once). In this case this will be called with a brand new object created on the client side.

Executing the Aggregation logic:

AggregationResult result = gigaSpace.aggregate(query, new AggregationSet().add(new ConcatAggregator("name")));
String concatResult = result.getString("concat(name)");
System.out.println(concatResult);

The ConcatAggregator Aggregation logic extending the SpaceEntriesAggregator:

import com.gigaspaces.query.aggregators.SpaceEntriesAggregator;
import com.gigaspaces.query.aggregators.SpaceEntriesAggregatorContext;

public class ConcatAggregator extends SpaceEntriesAggregator<String> {

    private final String path;
    private transient StringBuilder sb;

    public ConcatAggregator(String path) {
        this.path = path;
    }

    @Override
    public String getDefaultAlias() {
        return "concat(" + path + ")";
    }

    @Override
    public void aggregate(SpaceEntriesAggregatorContext context) {
        String value = (String) context.getPathValue(path);
        if (value != null)
            concat(value);
    }

    @Override
    public String getIntermediateResult() {
        return sb == null ? null : sb.toString();
    }

    @Override
    public void aggregateIntermediateResult(String partitionResult) {
        concat(partitionResult);
    }

    private void concat(String s) {
        if (sb == null) {
            sb = new StringBuilder(s);
        } else {
            sb.append(',').append(s);
        }
    }
}

Detailed Flow:

The aggregate(SpaceEntriesAggregatorContext context) is called within each partition for each matching space object. The actual Aggregation is done within the instance members (in this case the transient StringBuilder sb). When all matching space objects have been scanned, the getIntermediateResult method is called to return the aggregation result of that partition (in this case - a string) back to the client (that is holding the clustered space proxy).

The proxy holds a different instance of the ConcatAggregator custom aggregator, whenever it receives an intermediate result from each partition it calls aggregateIntermediateResult(String partitionResult). Once all partitions have returned their results, the proxy invokes the getFinalResult method to retrieve the final aggregation result. This method is not shown in the example above since it’s default implementation is to call getIntermediateResult method, which yields the correct value in most aggregation implementations. There might be some special cases where you will need to implement the getFinalResult method.

See also:

For more examples see the Services & Best Practices Custom Aggregator

Change code without restarts

When executing a aggregation over the space, the code is loaded from the remote client and cached for future executions. Since the code is cached, modifications are ignored, and users are forced to restart the space whenever they modify the code.

Starting with 12.1, you can use the @SupportCodeChange annotation to tell the space your code has changed. The space can store multiple versions of the same task. This is ideal for supporting clients using different versions of a task.

For example, start with annotating your task with @SupportCodeChange(id=“1”), and when the code changes, set the annotation to @SupportCodeChange(id=“2”), and the space will load the new task.

@SupportCodeChange(id ="1")
public class ConcatAggregator extends SpaceEntriesAggregator<String> {

    private final String path;
    private transient StringBuilder sb;

    public ConcatAggregator(String path) {
        this.path = path;
    }

    @Override
    public String getDefaultAlias() {
        return "concat(" + path + ")";
    }

    @Override
    public void aggregate(SpaceEntriesAggregatorContext context) {
        String value = (String) context.getPathValue(path);
        if (value != null)
            concat(value);
    }

    @Override
    public String getIntermediateResult() {
        return sb == null ? null : sb.toString();
    }

    @Override
    public void aggregateIntermediateResult(String partitionResult) {
        concat(partitionResult);
    }

    private void concat(String s) {
        if (sb == null) {
            sb = new StringBuilder(s);
        } else {
            sb.append(',').append(s);
        }
    }
}
@SupportCodeChange(id ="2")
public class ConcatAggregator extends SpaceEntriesAggregator<String> {

    private final String path;
    private transient StringBuilder sb;

    public ConcatAggregator(String path) {
        this.path = path;
    }

    @Override
    public String getDefaultAlias() {
        return "concat(" + path + ")";
    }

    @Override
    public void aggregate(SpaceEntriesAggregatorContext context) {
        String value = (String) context.getPathValue(path);
        if (value != null)
            concat(value);
    }

    @Override
    public String getIntermediateResult() {
        return sb == null ? null : sb.toString();
    }

    @Override
    public void aggregateIntermediateResult(String partitionResult) {
        concat(partitionResult);
    }

    private void concat(String s) {
        if (sb == null) {
            sb = new StringBuilder(s);
        } else {
            sb.append(':').append(s);
        }
    }
}

Considerations

If the Aggregator method is called frequently or large complex objects are used as return types, it is recommended to implement optimized serialization such as Externalizable for the returned value object or use libraries such as kryo.

See also:

For more information see Custom Serialization.