This section describes how to load data from the Data Grid to Spark.
Creating the Data Grid RDD
To load data from the Data Grid, use
SparkContext.gridRdd[R]. The type parameter
R is a Data Grid model class. For example,
val products = sc.gridRdd[Product]()
Once RDD is created, you can perform any generic Spark actions or transformations, e.g.
val products = sc.gridRdd[Product]() println(products.count()) println(products.map(_.quantity).sum())
Saving RDD to the Data Grid
To save Spark RDD to the Data Grid, use
RDD.saveToGrid method. It assumes that type parameter
T of your
RDD[T] is a Space class otherwise an exception will be thrown at runtime.
val rdd = sc.parallelize(1 to 1000).map(i => Product(i, "Description of product " + i, Random.nextInt(10), Random.nextBoolean())) rdd.saveToGrid()
Grid-Side RDD Filters
To query a subset of data from the Data Grid, use the
SparkContext.gridSql[R](sqlQuery, args) method. The type parameter
R is a Data Grid model class, the
sqlQueryparameter is a native Data Grid SQL query,
args are arguments for the SQL query. For example, to load only products with a quantity more than 10:
val products = sc.gridSql[Product]("quantity > 10")
You can also bind parameters in the SQL query:
val products = sc.gridSql[Product]("quantity > ? and featuredProduct = ?", Seq(10, true))
For more information about the Data Grid SQL queries, refer to Data Grid documentation.
Zip RDD with Grid SQL Data
You may want to transform the RDD by executing SQL query for each element in the RDD. This can be done using
rdd.zipWithGridSql() method. It returns a new RDD of tuple (element, query result items).
Note, this might be an expensive operation.
For example, you have RDD of Customers and want to associate them with their orders:
val customers: RDD[Customer] = ... val query = "customerId = ?" val queryParamsConstructor = (c: Customer) => Seq(c.id) val projections = None val orders: RDD[(Customer, Seq[Order])] = customers.zipWithGridSql[Order](query, queryParamsConstructor, projections)
Controlling Spark partitions
SparkContext.gridSql() take an optional
splitCount parameter which defines the number of Spark partitions per Data Grid partition. This feature is limited to
bucketed grid model.
For more information, refer to Data Modeling.