InsightEdge enhances the Agile Spark pushdown predicate capability by leveraging our native advanced indexing mechanism, to retrieve only relevant data entries when running a query (filter). This ability to filter directly on the source (instead of on the target as is done in the vanilla Apache Spark architecture) dramatically improves performance and reduces network overhead.
Pushdown predicates are dependent on indexing the relevant fields. The indexing can be adjusted to improve performance further, as described in the Indexing section. You can also refer to the Data Modeling topic.
When executing a query in InsightEdge, the following predicates are pushed down to the grid level.
Basic equality comparisons:
For more information about the geospatial predicates, refer to the GeoSpatial API topic.
To use the pushdown predicates with RDD, you can use the gridSql API as shown in the following example. With this API, the queries are performed in the data grid.
val rdd1 = sc.gridSql[Product]("quantity > 0")... val rdd2 = sc.gridSql[Product]("quantity > ? and featuredProduct = ?", Seq(10, true))
When using the geospatial API with the RDD API, the predicate is pushed down to the grid and the filtering is done on the data grid side. The gridSql API receives a Data Grid SQL query.
val rdd = sc.gridSql[SpatialData]("location spatial:within ?", Seq(circle(point(10,10), 10)))
For more information about these types of queries, refer to the RDD API topic.
When using the DataFrame or DataSet APIs, filters are performed in the client side (executors). If you choose to work with these APIs, use the regular syntax of the filter and where APIs. InsightEdge pushes down the supported predicates to the grid level.
//DataFrame val df = spark.read.grid[Product].where("quantity > 0") //Dataset val ds = spark.read.grid[Product].as[Product].where("quantity > 0")
When using the custom function syntax for filtering, the filtering itself is performed on the client side, not in the data grid.
//This does not use pushdown predicates and is performed in the client side. spark.read.grid[Product].as[Product].filter(p => p.quantity > 0).count()
Although this API is supported, the predicates are not pushed down to the data grid and therefore filtering is done on the client side.
val df = sqlContext.read.grid[SpatialData] val data = df.filter(df("location") geoWithin circle(point(10,10), 10))
Currently, Apache Spark does not push down aggregation queries (
count, etc.) to the data source. When this functionality becomes available in Apache Spark, it can also be used in InsightEdge.