How to send & receive data to/from the Kafka Broker of the Platform?
Introduction
You can insert data into ontologies of the platform using Kafka. To do that, you must check that option in the ontology creation.
The insertion from Kafka into the platform is done using the Digital-Broker module. Therefore, you also need to define an Digital Client into the platform with authorization to insert into the configured ontology. Go to the Digital CL option and create a new device. Make sure to grant access level of type INSERT or ALL.
At this point, the ontology in the platform is ready to receive data from Kafka.
Kafka Producer
The following code shows a Kafka producer example implemented in Java. This code can be used to test the ontology configuration. It is one of the examples available in the source code repository (sources/examples/example-kafka-client).
Kafka producer example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
|
At the beginning of the class, the connection parameters are specified.
Line 24 indicates the URL of the Kafka instance inside the platform.
Line 25 indicates the token of the device to be used.
Line 26 configure the Device Template that will be used.
Line 27 indicates the prefix of the topic to be used. Currently this is a fixed value.
Line 28 indicates the ontology that will be used. We must highlight that the used Device Template needs authorization to this ontology.
The example basically loads some example data from a file that it is included in the source code repository (../sources/examples/example-kafka-client/src/main/resources/data.json) and inserts that data in the Kafka topic associated to the ontology configured. This example requires that the ontology and the device be previously created. It is important to use flush() before the end of the program to be sure that all insertions have been done.
The schema that the ontology expects is the following one:
json schema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
The next code snippet shows the libraries used by the example. To insert data using Kafka into the onesait Platform, the only mandatory dependency is the kafka-clients library. The slf4j and jackson libraries are used in this example for logging and for JSON processing, respectively
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
|
Kafka Consumer
At this point an example of a Kafka producer is created, but you can also to implement Kafka consumers for the Kafka broker of the platform. To do that you can do it implementing a normal Kafka consumer taking into acount a couple of extra considerations.
The first one is that when you define the consumer configuration it is necessary to configure the right security parameters. Those paramenters are similar than to the used in the producer example.
The second one is that you should use a consumer group name different from the used by the platform. We recommend to use a name based on your application to avoid conflicts with other consumer groups.
The next code snippet shows an example of such configuration.
Kafka consumer configuration
private final static String TOPIC = "ONTOLOGY_EXAMPLE_KAFKA";
private final static String BOOTSTRAP_SERVERS = "localhost:9095";
//different from the consumer group of the platform to avoid conflicts.
private final static String KAFKA_CONSUMER_GROUP = "myExampleConsumerGroup";
private final static String TOKEN = "02148604a7ed4a4986c973513d35cca3";
private final static String DEVICETEMPLATE = "KafkaInserts";
private static Consumer<String, String> createConsumer() {
final Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP);
config.put("security.protocol", "SASL_PLAINTEXT");
config.put("sasl.mechanism", "PLAIN");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ DEVICETEMPLATE + "\" password=\"" + TOKEN + "\";");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Create the consumer using props.
final Consumer<String, String> consumer = new KafkaConsumer<>(config);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}Remember that you will need to edit ontology name, the Digital Client, and the token used.
The full consumer example can be seen in the code snippet.
Kafka consumer
package com.minsait.onesait.platform.examples.kafkaclient;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ExampleReadingKafka {
private final static String TOPIC = "ONTOLOGY_EXAMPLE_KAFKA";
private final static String BOOTSTRAP_SERVERS = "localhost:9095";
//different one from the consumer group of the platform to avoid conflicts.
private final static String KAFKA_CONSUMER_GROUP = "myExampleConsumerGroup";
private final static String TOKEN = "02148604a7ed4a4986c973513d35cca3";
private final static String DEVICETEMPLATE = "KafkaInserts";
private static Consumer<String, String> createConsumer() {
final Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP);
config.put("security.protocol", "SASL_PLAINTEXT");
config.put("sasl.mechanism", "PLAIN");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+ DEVICETEMPLATE + "\" password=\"" + TOKEN + "\";");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Create the consumer using props.
final Consumer<String, String> consumer = new KafkaConsumer<>(config);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
static void runConsumer() throws InterruptedException {
final Consumer<String, String> consumer = createConsumer();
final int giveUp = 350;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.printf("Consumer Record:(%d, %d, %s, %s)\n",
record.partition(), record.offset(),
record.key(), record.value() );
noRecordsCount++;
}
consumer.commitAsync();
if (noRecordsCount >= giveUp) {
break;
}
}
}
public static void main(String[] args) throws InterruptedException {
runConsumer();
}
}
Full example
The full example can be seen in source code repository and can be also downloaded from the next link: example-kafka-client.tar.gz