¿Cómo enviar y recibir datos a y desde el Kafka Broker de la Plataforma?

¿Cómo enviar y recibir datos a y desde el Kafka Broker de la Plataforma?

Introducción

Puedes insertar datos en las ontologías de la plataforma usando Kafka. Para ello, debes marcar esa opción en la creación o edición de la ontología.

 

La inserción de Kafka en la plataforma se realiza mediante el módulo Digital-Broker. Por lo tanto, también es necesario definir un Cliente Digital en la plataforma con autorización para insertar en la ontología configurada. Ve a la opción Cliente Digital y crea un nuevo dispositivo. Asegúrate de otorgar el nivel de acceso del tipo INSERTAR o TODO.

En este momento, la ontología de la plataforma está lista para recibir datos de Kafka.

Producto Kafka

El siguiente código muestra un ejemplo de productor de Kafka implementado en Java. Este código puede ser usado para probar la configuración de la ontología. Es uno de los ejemplos disponibles en el repositorio de código fuente (sources/examples/example-kafka-client).

Ejemplo de productor de Kafka

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

package com.minsait.onesait.platform.examples.kafkaclient;

 

import java.io.IOException;

import java.io.InputStream;

import java.util.Date;

import java.util.Properties;

import java.util.function.Consumer;

 

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.fasterxml.jackson.databind.JsonNode;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.fasterxml.jackson.databind.node.ArrayNode;

 

public class ExampleInsertingKafka {

     

    private static final Logger log = LoggerFactory.getLogger(ExampleInsertingKafka.class);

     

    private static String url = "localhost:9095";

    private static String token = "02148604a7ed4a4986c973513d35cca3";

    private static String deviceTemplate = "KafkaInserts";

    private static String prefix = "ONTOLOGY_";

    private static String ontology = "EXAMPLE_KAFKA";

     

    private static Properties createConfig(String token, String clientPlatform) {

        Properties config = new Properties();

        config.put(ProducerConfig.CLIENT_ID_CONFIG, "localhost");

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);

        config.put("security.protocol", "SASL_PLAINTEXT");

        config.put("sasl.mechanism", "PLAIN");

        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        config.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""

                + clientPlatform + "\" password=\"" + token + "\";");

        return config;

    }

     

    private static JsonNode getInstances() throws IOException {

        ClassLoader classLoader = ExampleInsertingKafka.class.getClassLoader();

        InputStream dataInputStream = classLoader.getResourceAsStream("data.json");

        final ObjectMapper mapper = new ObjectMapper();

         

        JsonNode rootNode = mapper.readTree(dataInputStream);

         

        return rootNode;

    }

     

    private static void insertInstance(KafkaProducer<String, String> producer, String ontology, String instance, String prefix) {

        try {

            producer.send(new ProducerRecord<String, String>(prefix + ontology, instance));    

            //metadata.get();

        } catch (Exception e) {

            throw new RuntimeException("Error inserting data with kafka", e);

        }

 

    }

     

    public static void main (String...args) throws IOException, InterruptedException {         

        log.info("Starting insert example...");

        Properties config = createConfig(token, deviceTemplate);

        log.info("Configuraion stablished");

         

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        log.info("Kafka producer created");

         

        JsonNode allData = getInstances();

        log.info("Example instances loaded");

         

        Consumer<JsonNode> insertInstance = new Consumer<JsonNode>() {

            @Override

            public void accept(JsonNode instance) {

                String instanceString = instance.toString();

                insertInstance(producer, ontology, instanceString, prefix);

                log.info("instance inserted - {}: {}", new Date().getTime(), instanceString);

            }

        };

         

        if (allData.isArray()) {

            ArrayNode arrayInstances = (ArrayNode) allData;

            arrayInstances.forEach(insertInstance);

        }      

         

        producer.flush();

        producer.close();

    }

}

Al principio de la clase se especifican los parámetros de conexión.

  • La línea 24 indica la URL de la instancia de Kafka dentro de la plataforma.

  • La línea 25 indica el token del dispositivo a utilizar.

  • La línea 26 configura la Plantilla del Dispositivo (Device Template) que se utilizará.

  • La línea 27 indica el prefijo del tema (topic) que se va a utilizar. Actualmente este es un valor fijo.

  • La línea 28 indica la ontología que se utilizará. Cabe destacar que la Plantilla de Dispositivo utilizada necesita autorización para esta ontología.

El ejemplo carga básicamente algunos datos de ejemplo de un archivo que está incluido en el repositorio de código fuente (../sources/examples/example-kafka-client/src/main/resources/data.json) e inserta esos datos en el tópico Kafka asociado a la ontología configurada. Este ejemplo requiere que la ontología y el dispositivo hayan sido creados previamente. Es importante utilizar flush() antes del final del programa para asegurarse de que se han realizado todas las inserciones.

El esquema que la ontología espera es el siguiente:

json schema

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

{

    "$schema": "http://json-schema.org/draft-04/schema#",

    "title": "GestampMeasures",

    "type": "object",

    "required": [

        "value",

        "timeStamp",

        "signal",

        "deviceId",

        "profile",

        "deviceType"

    ],

    "properties": {

        "value": {

            "type": "string"

        },

        "timeStamp": {

            "type": "number"

        },

        "description": {

            "type": "string"

        },

        "signal": {

            "type": "string"

        },

        "deviceId": {

            "type": "string"

        },

        "profile": {

            "type": "string"

        },

        "deviceType": {

            "type": "string"

        },

        "signalId": {

            "type": "string"

        }

    },

    "description": "Ontology for inserting measures using kafka",

    "additionalProperties": true

}

El siguiente fragmento de código muestra las bibliotecas utilizadas por el ejemplo. Para insertar datos usando Kafka en la Plataforma onesait, la única dependencia obligatoria es la biblioteca kafka-clients. Las librerías slf4j y jackson se utilizan en este ejemplo para el registro y para el procesado de JSON, respectivamente

pom.xml

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.minsait.onesait.platform</groupId>

  <artifactId>example-kafka-client</artifactId>

  <version>1.0.0</version> 

  <dependencies>

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>1.0.0</version>

    </dependency>

  <dependency>

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-simple</artifactId>

    <version>1.7.5</version>

  </dependency>

  <dependency>

      <groupId>com.fasterxml.jackson.core</groupId>

      <artifactId>jackson-core</artifactId>

      <version>2.9.5</version>

  </dependency>

  <dependency>

    <groupId>com.fasterxml.jackson.core</groupId>

    <artifactId>jackson-databind</artifactId>

    <version>2.9.5</version>

  </dependency>

  </dependencies>

   

  <build>

    <plugins>

      <plugin>

        <groupId>org.apache.maven.plugins</groupId>

        <artifactId>maven-compiler-plugin</artifactId>

        <version>3.7.0</version>

        <configuration>

          <source>1.8</source>

          <target>1.8</target>

        </configuration>

      </plugin>

    </plugins>

  </build>

</project>

Consumidor Kafka

En este punto se crea un ejemplo de un productor de Kafka, pero también puedes implementar los consumidores de Kafka para el Kafka broker de la plataforma. Para esto, puedes implementar un consumidor normal de Kafka teniendo en cuenta un par de consideraciones adicionales.

La primera es que, cuando defines la configuración del consumidor, es necesario configurar los parámetros de seguridad adecuados. Esos parámetros son similares a los utilizados en el productor ejemplo.

La segunda es que debes utilizar un nombre de grupo de consumidores diferente al que utilice la plataforma. Recomendamos utilizar un nombre basado en tu aplicación para evitar conflictos con otros grupos de consumidores.

El siguiente fragmento de código muestra un ejemplo de dicha configuración.

Configuración de consumidor Kafka

private final static String TOPIC = "ONTOLOGY_EXAMPLE_KAFKA"; private final static String BOOTSTRAP_SERVERS = "localhost:9095"; //different from the consumer group of the platform to avoid conflicts. private final static String KAFKA_CONSUMER_GROUP = "myExampleConsumerGroup"; private final static String TOKEN = "02148604a7ed4a4986c973513d35cca3"; private final static String DEVICETEMPLATE = "KafkaInserts"; private static Consumer<String, String> createConsumer() { final Properties config = new Properties(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); config.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP); config.put("security.protocol", "SASL_PLAINTEXT"); config.put("sasl.mechanism", "PLAIN"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + DEVICETEMPLATE + "\" password=\"" + TOKEN + "\";");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Create the consumer using props. final Consumer<String, String> consumer = new KafkaConsumer<>(config); // Subscribe to the topic. consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; }

Recuerda que tienes que editar el nombre de la ontología, el Cliente Digital y el token utilizado.

El ejemplo de consumidor completo puede verse en el fragmento de código.

Consumidor Kafka

package com.minsait.onesait.platform.examples.kafkaclient; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class ExampleReadingKafka { private final static String TOPIC = "ONTOLOGY_EXAMPLE_KAFKA"; private final static String BOOTSTRAP_SERVERS = "localhost:9095"; //different one from the consumer group of the platform to avoid conflicts. private final static String KAFKA_CONSUMER_GROUP = "myExampleConsumerGroup"; private final static String TOKEN = "02148604a7ed4a4986c973513d35cca3"; private final static String DEVICETEMPLATE = "KafkaInserts"; private static Consumer<String, String> createConsumer() { final Properties config = new Properties(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); config.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP); config.put("security.protocol", "SASL_PLAINTEXT"); config.put("sasl.mechanism", "PLAIN"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + DEVICETEMPLATE + "\" password=\"" + TOKEN + "\";");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Create the consumer using props. final Consumer<String, String> consumer = new KafkaConsumer<>(config); // Subscribe to the topic. consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } static void runConsumer() throws InterruptedException { final Consumer<String, String> consumer = createConsumer(); final int giveUp = 350; int noRecordsCount = 0; while (true) { final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> record: consumerRecords) { System.out.printf("Consumer Record:(%d, %d, %s, %s)\n", record.partition(), record.offset(), record.key(), record.value() ); noRecordsCount++; } consumer.commitAsync(); if (noRecordsCount >= giveUp) { break; } } } public static void main(String[] args) throws InterruptedException { runConsumer(); } }

Ejemplo completo

El ejemplo completo se puede ver en el repositorio de código fuente y también se puede descargar del siguiente enlace:

The full example can be seen in source code repository and can be also downloaded from the next link: example-kafka-client.tar.gz