Running a Spark Session in a Processing Unit

If you want to run a Spark session directly inside a Processing Unit, GigaSpaces supports using SparkSession. This is the entry point to interacting with Spark, and to enable programming with the Dataset and DataFrame APIs.

You can initiate and run a SparkSession inside a Processing Unit with the GigaSpaces SparkSessionProvider.

Creating a SparkSession

The SparkSessionProvider.Builder() method enables users to configure and create the SparkSession that they want to run in the Processing Unit. The functionality is very similar to the generic SparkSession.builder() method. See the sample code provided here:

SparkSessionProvider sparkSessionProvider = new SparkSessionProvider.Builder()
       .master("spark://localhost:7077")
       .create();
SparkSessionProvider.Wrapper sparkSessionWrapper = sparkSessionProvider.getOrCreate();

The SparkSessionProvider.getOrCreate() method returns a sparkSessionWrapper. If a wrapper already exists (if the method was previously called), the same wrapper is returned. Calling the method increments a global reference counter.

The SparkSessionWrapper has a close method that decrements the global reference counter and closes the SparkSession if the counter is zero.

Example

Let’s assume we have a service that exposes a countLines method, as follows:

package com.mycompany.app;

public interface MyService {
   long countLines(String path);
}

An implementation that uses a SparkSession to perform the counting via the DataFrame API would look like this:

package com.mycompany.app;

import org.apache.spark.sql.SparkSession;
import org.insightedge.spark.SparkSessionProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import java.io.Closeable;
import java.io.IOException;

public class MyServiceImpl implements MyService, Closeable {
   private static final Logger logger = LoggerFactory.getLogger(MyServiceImpl.class);
   private SparkSessionProvider.Wrapper sparkSessionWrapper;


   @PostConstruct
   public void initialize() {

       // Use SparkSessionProvider.Builder() instead of SparkSession.builder() to create a Spark Session
       SparkSessionProvider sparkSessionProvider = new SparkSessionProvider.Builder()
               .master("spark://localhost:7077")
               .create();
       sparkSessionWrapper = sparkSessionProvider.getOrCreate();
   }

   @Override
   public long countLines(String path) {
       logger.info("Started to count rows of [" + path + "]");
       SparkSession sparkSession = sparkSessionWrapper.get();
       long totalRows = sparkSession.read().text(path).count();
       logger.info("Total rows in file: " + totalRows);
       return totalRows;
   }

   @Override
   public void close() throws IOException {
       if (sparkSessionWrapper != null)
           sparkSessionWrapper.close();
   }
}

And then you can initialize the bean with a Configuration class, as follows:

package com.mycompany.app;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyConfiguration {

   @Bean
   public MyServiceImpl myService() {
       return new MyServiceImpl();
   }

}

Spark-related JARs are added to the classloader during runtime when the SparkSessionProvider is used. As a result, if you use Spark classes before this class is initialized, you will get a ClassNotFound error. For example, if you have a method that returns SparkSession in a bean that is configured in the pu.xml file.