Integrating Kafka with GigaSpaces Products

This topic describes how to implement the GigaSpaces connector for Apache Kafka, which has been verified by Confluent and is available as an add-on component in the GigaSpaces software package. Using the connector, you can integrate GigaSpaces products with Apache Kafka, using our write-behind data grid operations to Kafka to make data available for subscribers. Hadoop or any other data warehousing system can use the data for reporting and processing. GigaSpaces products can be implemented as the consumer by subscribing to Kafka messages, and as a producer by publishing Kafka messages. Both implementations are described below.

Introduction

Apache Kafka is a distributed publish-subscribe messaging system, which supports sending messages between applications, processes, and servers. A message is any kind of information that is sent from a producer (application that sends the messages) to a consumer (application that receives the messages).

Producers write their messages, or data, to Kafka topics. These topics are divided into partitions that function like logs. Each message is written to a partition and has a unique offset, or identifier. Consumers can specify a particular offset point where they can begin to read messages.

The Space as a Consumer

The Kafka persistence library provides a wrapper around the native Kafka Consumer API for the GigaSpaces-Kafka protocol serialization. For an example of how to use this wrapper, see the com.epam.openspaces.persistency.kafka.consumer.KafkaConsumer example in the <GS_HOME>/example module folder.

The Space sits behind the Kafka consumer service, and the data objects are passed to the Space instances using routing keys.

The Space as a Producer

The GigaSpaces-Kafka integration is done via the SpaceSynchronizationEndpoint interface deployed as a Kafka producer service. The service consumes a batch of data grid operations, converts them into custom Kafka messages, and sends these to the Kafka broker using the Kafka Producer API.

xap-kafka.jpg

The GigaSpaces-Kafka protocol represents the data and data grid operations. A message consists of the data grid operation type (Write, Update , Remove, etc.) and the actual data object. The data object itself may be represented either as a single object or as a SpaceDocument with key/value pairs. Kafka messages are sent via the network, so must be serialized into bytes. The default encoder utilizes the Java serialization mechanism, which implies Space classes (domain model) to be Serializable.

By default, Kafka messages are uniformly distributed across Kafka partitions. As such, even though data grid operations appear ordered in SpaceSynchronizationEndpoint, this doesn't imply the correct data processing ordering in Kafka consumers.

xap-kafka-ordering.jpg

Adding Kafka Streaming to the GigaSpaces Environment

This topic includes an example of GigaSpaces as a producer, which demonstrates how to configure Kafka persistence. It then shows how to implement a simple Kafka consumer, which pulls data from the Kafka broker and uses HsqlDB for storage.

Implementing GigaSpaces as a consumer is also explained along with how to configure the services via the pu.xml file, and how to customize the integration.

You need to install Kafka before installing the GigaSpaces connector.

Implementing the GigaSpaces Connector for Apache Kafka

The GigaSpaces connector for Apache Kafka is Confluent verified. We provide sample implementations of the connector as an open-source add-on to the GigaSpaces distribution.

To download and install the GigaSpaces-Kafka connector:

  1. Download the connector from Github.

  2. Unzip the connector package into an empty folder. The connector is located under <project_root>/example/.

  3. To start Zookeeper and the Kafka server, type the following commands:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
  4. To build the project, type the following command:

    cd <project_root>/example/dev-scripts
    ./rebuild.sh
  5. To deploy the connector, type the following command:

    cd <project_root>/example/dev-scripts
     ./deploy.sh
  6. Check the log files for messages from the Feeder and Consumer to confirm that the connector was installed successfully.

Configuring the Services

After you download and install the connector, you can configure the pu.xml with any necessary properties, such as

  • Kafka persistence
  • Kafka producer or consumer properties
  • Space class
  • Data modeling (SpaceDocument).

Kafka Persistence

Kafka persistence is essentially an implementation of the SpaceSynchronizationEndpoint. It takes a batch of data sync operations, converts them to a custom message protocol and sends them to the Kafka server using the Kafka Producer API.

Library Dependency

The following Maven dependency must be included in your project in order to use Kafka persistence. This artifact is built from the <project_rootd>/kafka-persistence source directory.

<dependency>
    <groupId>com.epam</groupId>
    <artifactId>kafka-persistence</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

Kafka Producer Service

You can configure the kafkaSpaceSynchronizationEndpoint as shown in the following example code, located under <project_root>/example/mirror/src/main/resources/META-INF/spring.where it is implemented as a GigaSpaces mirror service:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xmlns:os-sla="http://www.openspaces.org/schema/sla"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/9.1/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/9.1/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/9.1/remoting/openspaces-remoting.xsd
       http://www.openspaces.org/schema/sla http://www.openspaces.org/schema/sla/9.1/openspaces-sla.xsd">

    <bean id="propertiesConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:kafka.properties</value>
            </list>
        </property>
    </bean>

    <bean id="kafkaSpaceSynchronizationEndpoint" class="com.epam.openspaces.persistency.kafka.KafkaSpaceSynchronizationEndpointFactoryBean">
        <property name="producerProperties">
            <props>
                <!-- Kafka producer properties. Consult Kafka documentation for a list of available properties -->
                <prop key="metadata.broker.list">${metadata.broker.list}</prop>
                <prop key="request.required.acks">${request.required.acks}</prop>
            </props>
        </property>
    </bean>

    <!--
        The mirror space. Uses the Kafka external data source. Persists changes done on the Space that
        connects to this mirror space into the Kafka.
    -->
    <os-core:mirror id="mirror" url="/./mirror-service" space-sync-endpoint="kafkaSpaceSynchronizationEndpoint" operation-grouping="group-by-replication-bulk">
        <os-core:source-space name="space" partitions="2" backups="1"/>
    </os-core:mirror>

</beans>

For more information about the GigaSpaces mirror service, see the Asynchronous Persistency - Mirror Service topic in the Developer guide.

You can configure the Kafka processor service as shown in the following sample code, located under <project root>/example/processor/src/main/resources/META-INF/spring:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/9.1/core/openspaces-core.xsd">

    <!--
        Spring property configurer which allows us to use system properties (such as user.name).
    -->
    <bean id="propertiesConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <!--
        Enables the usage of @GigaSpaceContext annotation based injection.
    -->
    <os-core:giga-space-context/>

    <!--
        A bean representing a space (an IJSpace implementation).
    -->
    <os-core:space id="space" url="/./space" schema="default" mirror="true">
        <os-core:space-type type-name="Product">
            <os-core:id property="CatalogNumber"/>
            <os-core:basic-index path="Name"/>
            <os-core:extended-index path="Price"/>
        </os-core:space-type>
    </os-core:space>

    <!--
        OpenSpaces simplified space API built on top of IJSpace/JavaSpace.
    -->
    <os-core:giga-space id="gigaSpace" space="space" />
</beans>

Kafka Producer Properties

The following properties are the default applied to the Kafka producer in the GigaSpaces-Kafka protocol. You can override them if necessary.

Property Default Value Description
key.serializer.class com.epam.openspaces.persistency.kafka.
protocol.impl.serializer.KafkaMessageKeyEncoder
Message key serializer of the default Gigaspace-Kafka protocol.
serializer.class com.epam.openspaces.persistency.kafka.
protocol.impl.serializer.KafkaMessageEncoder
Message serializer of the default Gigaspace-Kafka protocol.

For a full list of available producer properties, see the Kafka Producer Configurations page in the Confluent documentation.

Kafka Consumer Service

You can configure the Kafka consumer service as shown in the following sample code, located under <project root>/example/consumer/src/main/resources/META-INF/spring:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                    <value>classpath:kafka.properties</value>
            </list>
        </property>
    </bean>

    <!-- Kafka Consumer, wrapper around native Kafka producer API -->
	<bean id="kafkaConsumer" class="com.epam.openspaces.persistency.kafka.consumer.KafkaConsumerFactoryBean">
		<property name="consumerProperties">
			<props>
                <!-- Kafka consumer properties. Consult Kafka documentation for a list of available properties -->
				<prop key="zookeeper.connect">${zookeeper.connect}</prop>
				<prop key="group.id">${group.id}</prop>
				<prop key="zookeeper.session.timeout.ms">${zookeeper.session.timeout.ms}</prop>
				<prop key="zookeeper.sync.time.ms">${zookeeper.sync.time.ms}</prop>
				<prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
			</props>
		</property>
	</bean>

    <!-- Consumer which implements business logic, consumes data from Kafka and saves to database using Hibernate -->
	<bean id="consumer" class="com.epam.consumer.Consumer">
		<property name="consumer" ref="kafkaConsumer" />
	</bean>

</beans>

Associating a Kafka Topic with a Space Class

In order to associate a Kafka topic with the domain model class, the class must be annotated with the @KafkaTopic annotation and declared as Serializable. See the following sample code.

@KafkaTopic("user_activity")
@SpaceClass
public class UserActivity implements Serializable {
    ...
}

Space Documents

To configure a Kafka topic for a SpaceDocument or Extended SpaceDocument, the KafkaPersistenceConstants.SPACE_DOCUMENT_KAFKA_TOPIC_PROPERTY_NAME property should be added to the document. See the following sample code.

public class Product extends SpaceDocument {

public Product() {
    super("Product");
    super.setProperty(SPACE_DOCUMENT_KAFKA_TOPIC_PROPERTY_NAME, "product");
}

You can also configure the name of the property that defines the Kafka topic for SpaceDocuments. Set the spaceDocumentKafkaTopicName to the required value, as shown in the sample code below.

<bean id="kafkaSpaceSynchronizationEndpoint" class="com.epam.openspaces.persistency.kafka.KafkaSpaceSynchronizationEndpointFactoryBean">
    ...
    <property name="spaceDocumentKafkaTopicName" value="topic_name" />
</bean>

Customizing the Integration

You can customize the GigaSpaces-Kafka protocol as necessary, to suit your specific use case and environment. The following guidelines may be helpful:

  • Kafka persistence was designed to be extensible and customizable.
  • If you want to create a custom protocol between your GigaSpaces product and Kafka, provide an implementation of AbstractKafkaMessage, AbstractKafkaMessageKey, and AbstractKafkaMessageFactory.
  • If you want to customize how the data grid operations are sent to Kafka, or how the Kafka topic is chosen for a given entity, provide an implementation of AbstractKafkaSpaceSynchronizationEndpoint.
  • If you want to create a custom serializer, look at KafkaMessageDecoder and KafkaMessageKeyDecoder.