Kafka as a Data Source

Configure Confluent Kafka as a Data Source

To set up Confluent KafkaClosed as a data source, you first need to generate Confluent Cloud API keys and retrieve the relevant bootstrap server address:

  1. Login to your account in https://confluent.cloud/.

  2. Verify that you have created a cluster. In the following example, the cluster is called dih-poc:

  3. Navigate to the API keys page.

  4. Click Add key.

  5. Enter the account, scope, and details required for the API key and click Create API Key.

    The API key download page displays the new API key and secret.

  6. Click Download API key to save the API key and secret. After you click Complete, the API secret is no longer available.

  7. Navigate to the Cluster Settings page.

  8. Copy the bootstrap server address.

The API keys and bootstrap server addresses can be used either in code, or when creating a data source in SpaceDeckClosed.

For example, through the CLI:

properties.put("bootstrap.servers", "YOUR BOOTSTRAP SERVERS"); // Replace with your bootstrap servers properties.put("sasl.mechanism", "PLAIN"); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";"); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); properties.put("group.id", "my-consumer-group"); // Create the Kafka consumer Consumer<String, String> consumer = new KafkaConsumer<>(properties);

Create a Confluent Kafka Data Source

To create a Confluent Kafka data source:

  1. Create a space.

  2. Prepare venv to use https://github.com/giga-di/di-lnp/blob/main/kafka-writer/kafka_writer.py:

    python -m venv venv source venv/bin/activate venv/bin/pip install -r requirements.txt
  3. Write messages to kafka using a script - see https://github.com/giga-di/di-lnp/blob/main/kafka-writer/kafka_writer.py with commands like the following example (complex - for messages with complex type). This script generates messages only for demonstration. The messages can have a different structure.

    venv/bin/python kafka_writer.py http://localhost:8082 schema.complex 10 31 1000 complex venv/bin/python kafka_writer.py http://localhost:8082 schema.raw 0 10 1000 raw
  4. In SpaceDeck, under Data Source, create a new data source of type KAFKA.

  5. Enter the bootstrap server address retrieved previously.

  6. Select a table.

  7. Enter the API key details retrieved previously.

  8. Start a pipeline.

  9. Verify that data is sent to the space.

Kafka Message Schema Discovery

Kafka messages should be in JSON format. The schema of the JSON is automatically extracted.

For example:

JSON message:

{ "COUPONID": "BR2_000000000", "OFFERPROMOID": "candle_000", "EMAIL": "mail_000000001@gmail.com", "STR_EMPTY_ARRAY_PROP": [], "STR_ARRAY_PROP": [ "abcd", "1234", "DDD1", "ASD±!@" ], "INT_EMPTY_ARRAY_PROP": [], "INT_ARRAY_PROP": [ 1, 2, 3, 4 ], "INT_AS_STRING_ARRAY_PROP": [ "1", "2", "3", "4" ], "SHORT_EMPTY_ARRAY_PROP": [], "SHORT_ARRAY_PROP": [ 11, 22, 33, 44 ], "SHORT_AS_STRING_ARRAY_PROP": [ "11", "22", "33", "44" ], "DOUBLE_EMPTY_ARRAY_PROP": [], "DOUBLE_ARRAY_PROP": [ 11.292, 22.32323, 33.4234234, 44.543543 ], "DOUBLE_AS_STRING_ARRAY_PROP": [ "11.102", "22.3939", "33.3738", "44.00972" ], "LONG_EMPTY_ARRAY_PROP": [], "LONG_ARRAY_PROP": [ 11111, 222222, 33333, 444444 ], "LONG_AS_STRING_ARRAY_PROP": [ "11", "22", "33", "44" ], "BIG_INTEGER_EMPTY_ARRAY_PROP": [], "BIG_INTEGER_ARRAY_PROP": [ 1111111111, 222222222, 33333333, 444444444 ], "BIG_INTEGER_AS_STRING_ARRAY_PROP": [ "1111111", "222222", "33333333", "4444444444" ], "BIG_DECIMAL_EMPTY_ARRAY_PROP": [], "BIG_DECIMAL_ARRAY_PROP": [ 1111111111, 222222222.43424323, 33333333.123333, 444444444.4343242 ], "BIG_DECIMAL_AS_STRING_ARRAY_PROP": [ "1111111.4234234", "222222.4324234", "33333333.434343", "4444444444.423432" ], "B_BZ00_TOKEN": 1000, "BZ00_TOKEN": 1000, "BZ00_TAR_RISHUM": "2021-08-17", "BZ00_ZMAN_RISHUM": "16:22:47", "BZ00_PEULA": 36, "BZ00_SNIF": 508, "BZ00_LAKOACH": 4741, "BZ00_ARUTZ": 44, "BZ00_BANK_MESHADER": 318.123456789, "BZ00_SNIF_MESHADER": 28, "BZ00_MAHADURA": "KKKK", "BZ00_SUG_CHESHBON": 14, "BZ00_CHESHBON": 28733, "BZ00_PEULA_ONLINE": 78, "BZ00_EZH_OFI_CHN": "BB", "BZ00_EZH_OFI_SHIUH": 52, "BZ00_EZH_SUG_LAK": "K", "BZ00_TAT_ARUTZ": "H", "BZ00_SHIUCH_CHATIV": 23, "BZ00_YAMNAL": "290440.62", "BZ00_ISKI_PRATI": 222, "BZ00_MIKUM_PEILUT": "QQQQQQQQQQQQQQQ", "BZ00_SW_AMLMAOF": 0, "BZ00_LAK_YACHID": "W", "BZ00_SW_NEKUDOT": "F", "BZ00_HIZD_PEULA": "DD", "BZ00_TAALICH_PEULA": "true", "MAPTEST": { "name": "Moshe", "contact": "+972-9876543210", "city": "Jerusalem" }, "EMPTY_ARRAY_OF_MAPS_PROP": [], "ARRAY_OF_MAPS_PROP": [ { "name": "Moshe", "contact": "+972-9876543210", "city": "Jerusalem" }, { "name": "Yosi", "contact": "+972-9876540000", "city": "Herzlia" }, { "name": "Jonatan", "contact": "+972-310101010", "city": "Haifa" } ], "LOCAL_DATE_EMPTY_ARRAY_PROP": [], "LOCAL_DATE_ARRAY_PROP": [ "2021-08-18", "2021-09-19", "2021-10-11", "2021-12-01" ], "LOCAL_DATE_TIME_EMPTY_ARRAY_PROP": [], "LOCAL_DATE_TIME_ARRAY_PROP": [ "2021-08-18T17:39:34.556153000", "2021-09-19T17:39:34.556153000", "2021-10-11T17:39:34.556153000", "2021-12-01T17:39:34.556153000" ], "LOCAL_TIME_EMPTY_ARRAY_PROP": [], "LOCAL_TIME_ARRAY_PROP": [ "08:00:34", "17:09:00", "17:39:01", "17:39:35" ] }

Type created in the space: