DataFrames API
DataFrames provide an API for manipulating data within Spark. These provide a more user friendly experience than pure Scala for common queries.
To read more about DataFrames API, please refer to the Spark Documentation.
This section describes how to use the DataFrames API with the Data Grid.
Preparing
The entry point to Dataset features is Spark SparkSession
(replaces the old SQLContext
).
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
// For implicit conversions like converting RDDs to DataFrames to Dataset
import spark.implicits._
// This is used to simplify calling Data Grid features
import org.insightedge.spark.implicits.all._
Person Case Class
For the following code snippets, we will use a simple case class named Person. This case class can be written to (and loaded from) the Data Grid or an external persisted storage.
import org.insightedge.scala.annotation._
import scala.beans.BeanProperty
case class Person(
@BeanProperty @SpaceId(autoGenerate = true) var id: String,
@BeanProperty var name: String,
@BeanProperty var age: Int ) {
def this() = this(null, null, -1)
}
Creating DataFrames
RDDs are stored in Data Grid simply as collections of objects of a certain type. You can create DataFrame by such type with the next syntax:
val spark: SparkSession // An existing SparkSession.
import org.insightedge.spark.implicits.all._
val df = spark.read.grid[Person]
// Displays the content of the DataFrame to stdout
df.show()
val spark: SparkSession // An existing SparkSession.
spark.sql(
s"""
|create temporary table people
|using org.apache.spark.sql.insightedge
|options (class "${classOf[Person].getName}")
""".stripMargin)
val df = spark.sql("select * from people")
// Displays the content of the DataFrame to stdout
df.show()
val spark: SparkSession // An existing SparkSession.
val df = spark.read
.format("org.apache.spark.sql.insightedge")
.option("class", classOf[Person].getName)
.load()
// Displays the content of the DataFrame to stdout
df.show()
If you want to load DataFrame from SpaceDocuments
written form a third-party application, please, refer to section below
Persisting DataFrames to the Data Grid
To write a DataFrame use DataFrame.write
. The content of the DataFrame is saved with a specified collection name.
The behavior of the write operation is controlled by the SaveMode
.
Since DataFrames are no longer linked to object type, the content of the DataFrame is persisted by the specified collection name. Thus, when saving a DataFrame to the Data Grid, you must provide a collection name, and when loading persisted DataFrame, the same collection name must be used instead of object type.
To persist and load persisted DataFrame you can use the following syntax:
import org.insightedge.spark.implicits.all._
val spark: SparkSession // An existing SparkSession.
val df: DataFrame // An existing DataFrame
// Persist DataFrame into a collection named "people"
df.write.mode(SaveMode.Overwrite).grid("people")
// Load the DataFrame from the collection by name
val persisted = spark.read.grid("people")
// Displays the content of the DataFrame to stdout
persisted.show()
val spark: SparkSession // An existing SparkSession.
val df: DataFrame // An existing DataFrame
df.write
.format("org.apache.spark.sql.insightedge")
.mode(SaveMode.Overwrite)
.save("people")
val persisted = spark.read
.format("org.apache.spark.sql.insightedge")
.load("people")
// Displays the content of the DataFrame to stdout
persisted.show()
Nested properties are stored as DocumentProperties
in Data Grid when DataFrame is persisted
Persisted DataFrames are shared among multiple Spark jobs and stay alive after the jobs are complete.
Nested Properties
Let's enhance Person
class with an additional property Address
(that has some properties of it's own).
import org.insightedge.scala.annotation._
import scala.beans.BeanProperty
case class Person(
@BeanProperty @SpaceId(autoGenerate = true) var id: String,
@BeanProperty var name: String,
@BeanProperty var age: Int,
@BeanProperty var address: Address ) {
def this() = this(null, null, -1, null)
}
case class Address(city: String, state: String)
If you write some Person
to a Data Grid, e.g. by persisting an RDD, and then load them as DataFrame,
you can see that DataFrame schema includes nested properties:
import org.insightedge.spark.implicits.all._
val spark: SparkSession // An existing SparkSession.
val df = spark.read.grid[Person]
// Displays the schema of the DataFrame to stdout
df.printSchema()
This code will print the next schema:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- state: string (nullable = true)
Nested properties can be accessed using dot-separated syntax, e.g. address.city
. Here is an example of filtering by nested properties:
val spark: SparkSession // An existing SparkSession.
val df = spark.read.grid[Person]
val countInBuffalo = df.filter(df("address.city") equalTo "Buffalo").count()
// Displays number of people from Buffalo
println(countInBuffalo)
You can also unwrap the nested properties, thus changing the DataFrame schema:
val spark: SparkSession // An existing SparkSession.
spark.read.grid[Person].createOrReplaceTempView("people")
val df = spark.sql("select address.city as city, address.state as state from people")
val countInBuffalo = df.filter(df("city") equalTo "Buffalo").count()
// Displays the schema of the DataFrame to stdout
df.printSchema()
// Displays number of people from Buffalo
println(countInBuffalo)
This code will print the next DataFrame schema:
root
|-- city: string (nullable = true)
|-- state: string (nullable = true)
When DataFrame is persisted, nested properties are stored as DocumentProperties
in the Data Grid.
Controlling Spark Partitions
Similar to RDD in order to control the number of Spark partitions, set the splitCount
option:
val spark: SparkSession // An existing SparkSession.
val df = spark.read.option("splitCount", "4").grid[Person]
The splitCount
defines the number of Spark partitions per Data Grid partition. This feature is limited to bucketed
grid model. Please, refer to Data Modeling for more details.
Loading Custom SpaceDocuments
If you have a third-party application writing SpaceDocuments
into the grid, you have to manually provide a schema for the DataFrame. Nested properties must be supplied with class metadata, so that DataFrame API can properly parse the SpaceDocuments
(see the address
field in the example below).
import org.insightedge.spark.implicits.all._
import org.apache.spark.sql.types._
val addressType = StructType(Seq(
StructField("state", StringType, nullable = true),
StructField("city", StringType, nullable = true)
))
val schema = StructType(Seq(
StructField("personId", StringType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("surname", StringType, nullable = true),
StructField("age", IntegerType, nullable = false),
StructField("address", addressType, nullable = true, nestedClass[Address])
))
val spark: SparkSession // An existing SparkSession.
val df = spark.read.schema(schema).grid("people")