Synchronization Endpoint


A Cassandra based implementation of the Space Synchronization Endpoint.

Library dependencies

The Cassandra Space Synchronization Endpoint uses the Hector Library For communicating with the Cassandra cluster. Include the following in your pom.xml

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-clientutil</artifactId>
    <version>1.1.6</version>
</dependency>

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-thrift</artifactId>
    <version>1.1.6</version>
</dependency>

<dependency>
    <groupId>org.hectorclient</groupId>
    <artifactId>hector-core</artifactId>
    <version>1.1-2</version>
</dependency>
<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-clientutil</artifactId>
    <version>1.1.6</version>
</dependency>

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-thrift</artifactId>
    <version>1.1.6</version>
</dependency>

<dependency>
    <groupId>org.hectorclient</groupId>
    <artifactId>hector-core</artifactId>
    <version>1.1-2</version>
    <exclusions>
        <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-jdk14</artifactId>
    <version>1.6.6</version>
</dependency>

Setup

An example of how the Cassandra Space Synchronization Endpoint can be configured within a mirror.

<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/12.2
/core/openspaces-core.xsd">

    <bean id="propertiesConfigurer"
       class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <bean id="hectorClient"
        class="org.openspaces.persistency.cassandra.HectorCassandraClientFactoryBean">

        <!-- comma separated seed list -->
        <property name="hosts" value="${cassandra.host}" />

        <!-- cassandra rpc communication port -->
        <property name="port" value="${cassandra.port}" />

        <!-- keyspace name to work with -->
        <property name="keyspaceName" value="${cassandra.keyspace}" />

    </bean>

    <bean id="cassandraSpaceSyncEndpoint"
        class="org.openspaces.persistency.cassandra.CassandraSpaceSynchronizationEndpointFactoryBean">

        <!-- configured above -->
        <property name="hectorClient" ref="hectorClient" />

    </bean>

    <os-core:mirror id="mirror" name="mirror-service"
        space-sync-endpoint="cassandraSpaceSyncEndpoint">
    <os-core:source-space name="space" partitions="${numOfPartitiones}" backups="${numOfBackups}"/>
    </os-core:mirror>

</beans>
HectorCassandraClient hectorClient = new HectorCassandraClientConfigurer()
    .clusterName(cluster)
    .hosts(cassandraHosts)
    .port(cassandraPort)
    .keyspaceName(cassandraKeyspaceName)
    .create();

SpaceSynchronizationEndpoint syncEndpoint = new CassandraSpaceSynchronizationEndpointConfigurer()
    .hectorClient(hectorClient)
    .create();

IJSpace mirror = new EmbeddedSpaceConfigurer("mirror-service")
    .schema("mirror")
    .spaceSynchronizationEndpoint(syncEndpoint)
    .addProperty("space-config.mirror-service.cluster.name", "space")
    .addProperty("space-config.mirror-service.cluster.partitions", String.valueOf(numOfPartitiones))
    .addProperty("space-config.mirror-service.cluster.backups-per-partition", String.valueOf(numOfBackups))
    .create();

For more details about different configurations see Space Persistency.

CassandraSpaceSynchronizationEndpoint Properties

Property Description
hectorClient A configured HectorCassandraClient bean. see Hector Cassandra Client.
fixedPropertyValueSerializer see Property Value Serializer.
dynamicPropertyValueSerializer see Property Value Serializer.
flattenedPropertiesFilter see Flattened Properties Filter.
columnFamilyNameConverter see Column Family Name Converter.

Property Value Serializer

By default when serializing object/document properties to column values, the following serialization logic is applied:

For fixed properties:

  • If the type of the value to be serialized matches a primitive type in Cassandra it will be serialized as defined by the Cassandra primitive type serialization protocol.
  • Otherwise, the value will be serialized using standard java Object serialization mechanism.

For dynamic properties:

It is possible to override this default behavior by providing a custom implementation of PropertyValueSerializer. This interface is defined by these 2 methods:

ByteBuffer toByteBuffer(Object value);
Object fromByteBuffer(ByteBuffer byteBuffer);

The behavior of overriding the serialization logic is different for fixed properties and dynamic properties:

  • Fixed properties will only be serialized by the custom serializer if their type does not match a primitive type in Cassandra.
  • Dynamic properties will always be serialized using the provided implementation. This means that they should be able to handle primitive types such as Integer, Long, etc…

Overriding the property value serializers in the Cassandra Space Synchronization Endpoint must be followed by overriding the same serializers in the Cassandra Space Data Source. Failure to do so will prevent the Cassandra Space Data Source from properly deserializing values read from Cassandra.

Flattened Properties Filter

Introduction

When a type is introduced to the Cassandra Space Synchronzation Endpoint, the type’s fixed properties will be introspected and the final result will be a mapping from this type’s nested properties to column family columns. The default behavior of this mapping is explained in the following example. Consider the following simple POJO (could also be a SpaceDocument’s fixed properties):

// implementation omitted for brevity
@SpaceClass
public class Person {

    @SpaceId
    public Long getId() ...

    public String getName() ...

    public Address getAddress() ...

    ...

}

public class Address {

    public String getStreetName() ...

    public Long getStreetNumber() ...

}

By default, the fixed properties will be mapped to the Person column family in Cassandra like this:

Property Column Name (and type)
person.id (row key) (type: Long)
person.name name (type: UTF8)
person.address.streetName address.streetName (type: UTF8)
person.address.streetNumber address.streetNumber (type: Long)

Notice how the address property was flattened and its properties are flattened as columns.

Now suppose that a Person is written to the space as a SpaceDocument which also includes these dynamic properties:

  • String newName
  • Address newAddress

By default, dynamic properties are not flattened and are written as is to Cassandra. Moreover, their static type is not updated in the Column Family metadata and they are serialized using a custom serializer. (see Property Value Serializer).

This is how they will be written to Cassandra:

Property Column Name (and type)
person.newName newName (type: Bytes)
person.newAddress newAddress (type: Bytes)

Customization

It is possible to override the above behavior by providing a FlattenedPropertiesFilter implementation. The implementations is used during type introspection when a type is first introduced to the synchronization endpoint and whenever an entry of that type is written which contains dynamic properties.

The interface is defined by a single method:

boolean shouldFlatten(PropertyContext propertyContext);

The return value indicates whether the current introspected property should be serialized as is or should its nested properties be introspected as well. As for the above example, the default implementation DefaultFlattenedPropertiesFilter returns true if and only if the property is fixed and the current introspection nesting level does not exceed 10.

The PropertyContext contains the following details about the current introspected property:

String getPath();
String getName();
Class<?> getType();
boolean isDynamic();
int getCurrentNestingLevel();

Column Family Name Converter

Due to implementation details of Cassandra regarding Column Families there are certain limitations when converting a type name (e.g: com.example.data.Person) to a column family name. Among these limitations is a 48 characters max length limitation and invalid characters in the name (such as ‘.’). The behavior for converting a type name to a column family name when creating a column family is defined by the interface ColumnFamilyNameConverter. This interface is defined by 1 method:

String toColumnFamilyName(String typeName);

The default implementation is: DefaultColumnFamilyNameConverter.

Considerations

  • Collections and Maps are not flattened and are serialized as blobs using java object serialization mechanism.
  • Writing entries that only have their id property set is not supported, these entries will not be written to Cassandra.