Cómo suscribirse al Digital Broker usando el API Cliente Java?

Cómo suscribirse al Digital Broker usando el API Cliente Java?

Esta funcionalidad estará disponible a partir de la versión 2.0.0

Introducción

Como ya sabrás a estas alturas, en Onesait Platform trabajamos siempre con ontologías. Las ontologías son las entidades que gestiona el sistema y que se comparten con otros sistemas.

Para más información sobre ontologías visitar: https://onesaitplatform.atlassian.net/wiki/spaces/PT/pages/112918529.

En este tutorial vamos a ver cómo podemos suscribirnos a una ontología según el valor de alguno de sus campos. Es importante tener el claro el flujo de que van a seguir los datos. Para ello, sigue estos pasos:

  1. Creación de la suscripción en el controlpanel: aquí definiremos la ontología y el campo al cual queremos suscribirnos.

  2. Suscripción de un cliente Java: el cliente se suscribirá a la suscripción creada anteriormente, pero lo hará para un valor determinado del campo (recuerda que este campo está definido en la suscripción que hemos creado en el controlpanel).

  3. Notificación de los datos: una vez el cliente está suscrito, cada vez que se inserte un dato en la ontología de la suscripción, se hará una comprobación para saber si los datos insertados concuerdan con el valor al cual está suscrito el cliente. En el caso de que concuerde, se le notificarán los datos definidos nuevamente en la suscripción creada en el controlpanel.

Creación de la suscripción en el controlpanel

Lo primero que hay que hacer es crear la suscripción en el controlpanel y para ello nos vamos al menú CLIENTS & DIGITAL TWINS > My Subscriptions Definitions

Nos redirigirá al listado de suscripciones y para crear una nueva le damos al botón “CREATE”

Nos aparecerá un formulario con los siguientes campos:

  • Identification: nombre de la suscripción. Este identificador es el que utilizaremos más adelante para suscribirnos desde los clientes.

  • Select Ontology: ontología a la cual nos queremos suscribir. Una vez seleccionada la ontología, aparecerá en la parte inferior del formulario una instancia de dicha ontología, para ayudarnos a elegir la configuración de la suscripción.

  • Description: breve descripción de la suscripción.

  • Query field: campo de la ontología al cual nos queremos suscribir. Utilizamos JSONPath para la selección de este campo. Para este ejemplo hemos seleccionado el campo ‘status’

  • Projection: datos que se quieren notificar a los clientes suscritos. Utilizamos JSONPath para la selección de estos datos, es importante tener en cuenta que en el caso de que se quiera recibir toda la instancia de la ontología hay que poner el caracter $. Para este ejemplo hemos seleccionado el campo ‘file’

  • Query operator: se trata de un desplegable dónde podemos elegir el tipo de operación que queremos que se verifique cuando se inserta una instancia.

Una vez relleno el formulario, le damos al botón “NEW“ lo que nos redirigirá al listado de suscripciones, dónde podremos ver la que acabamos de crear:

Suscripción de un cliente Java

Esta funcionalidad está disponible a partir de la versión 1.4.2 de la librería cliente.

A continuación vamos a suscribirnos a la suscripción creada en el apartado anterior a través de un cliente Java, primero lo haremos por REST y luego por MQTT.

Se han añadido a la librería los métodos “subscribe“ y “unsubscribe“ para gestionar las suscripciones.

Suscripción vía REST

Para suscribirte a una suscripción vía REST necesitas saber los siguientes parámetros:

  • subscription: identificador de la suscripción creada en el controlpanel.

  • queryValue: el valor al que quieres suscribirte. Ten en cuenta que tiene que concordar con el campo seleccionado en la suscripción.

  • callback: se trata del enpoint al cual se quieren mandar las notificaciones.

Veamos un ejemplo práctico. En el siguiente ejemplo se muestra el método “main” de una aplicación:

public static void main(String[] args) throws SSAPConnectionException { final String token = "e7ef0742d09d4de5a3687f0cfdf7f626"; final String deviceTemplate = "TicketingApp"; final String device = "TicketMachine1"; final String ontology = "Ticket"; final String subscription = "ticketStatus"; final String queryValue = "DONE"; final String callback = "http://localhost:10000/turbine/rest/notify"; final ObjectMapper mapper = new ObjectMapper(); RestClient client = null; try { log.info("Now we are going to execute the example"); client = new RestClient("http://localhost:19000/iot-broker"); log.info("1. Connecting to {}", client.getRestServer() + " with token:" + token + " and device:" + deviceTemplate + ":" + device); log.info("(Rest client will accept all SSL certificates)"); client.connect(token, deviceTemplate, device, true); log.info("...Connected to {}", client.getRestServer()); log.info("4. Subscribing to subscription:" + subscription); JsonNode subscriptionId = client.subscribe(subscription, queryValue, callback); log.info("...Subscribed with id:" + subscriptionId.asText()); String instance = "{\"Ticket\":{\"identification\":\"\",\"status\":\"DONE\",\"email\":\"iex@email.com\",\"name\":\"Alberto\",\"response_via\":\"email\",\"file\":{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}},\"coordinates\":{\"coordinates\":{\"latitude\":45.456,\"longitude\":-41.283},\"type\":\"Point\"}}}"; log.info("5. Inserting one instance:" + instance); final String idInsert = client.insert(ontology, mapper.readTree(instance).toString()); log.info("...Inserted with id {}" + idInsert); log.info("8. Unsubscribe id {}", subscriptionId.asText()); client.unsubscribe(subscriptionId.get("subscriptionId").asText()); log.info("...Unsubscribed"); log.info("7. Disconnecting"); client.disconnect(); log.info("...Disconnected"); } catch (final Exception e) { log.error("Error in process", e); } }

Es conveniente prestar atención a las siguientes líneas de código para entender el ejemplo:

  • (Línea 7, 8 y 9) Aquí se definen los parámetros necesarios para que el cliente se suscriba. En este caso nos estamos suscribiendo a todas las instancias cuyo valor del campo “status“ sea igual a “DONE“.

Ten en cuenta que esto es un ejemplo sencillo, pero que puede suscribirse a campos de tipo objeto más complejos.

  • (Línea 15) Aquí se elige la URL del entorno al cual se quiere conectar. (En este ejemplo estamos trabajando sobre local).

  • (Línea 19) Se realiza la conexión contra el iotbroker para obtener la sessionkey.

  • (Línea 23) Realizamos la suscripción.

Cada vez que se realiza una suscripción, la plataforma nos devuelve un 'subscriptionId' único para cada suscripción. Es importante guardar este dato, ya que todas las notificaciones llevarán dentro del mensaje este identificador. Además es necesario para eliminar dicha suscripción.

  • (Línea 28) Se realiza una inserción en la ontología con el valor “status“ igual a “DONE“.

  • (Línea 32) Se elimina la suscripción utilizando el subscriptionId obtenido en el paso de la suscripción (línea 23).

  • (Línea 36) Se desconecta el cliente.

Cuando se realiza la notificación al cliente suscrito, recibirá un mensaje de este estilo:

{ "messageId": null, "sessionKey": "4a786ccb-09cb-4839-a7e1-5c7f9becb620", "direction": "RESPONSE", "messageType": "INDICATION", "transactionId": null, "body": { "@type": "SSAPBodyIndicationMessage", "ontology": null, "subscriptionId": "0df86f69-d252-48d3-ad4a-9ba09308f40e", "data": "[{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}}]" } }

Es importante saber que la plataforma tiene definido un número de reintentos para las notificaciones y si no consigue contactar con el cliente asumirá que no está activo y eliminará la suscripción. Por lo que sería necesario que el cliente volviera a suscribirse de nuevo al rearrancar.

Suscripción vía MQTT

Para suscribirte a una suscripción vía MQTT, necesitas saber los siguientes parámetros:

  • subscription: identificador de la suscripción creada en el controlpanel.

  • queryValue: el valor al que quieres suscribirte. Ten en cuenta que tiene que concordar con el campo seleccionado en la suscripción.

Veamos un ejemplo práctico. En el siguiente ejemplo se muestra el método “main” de una aplicación:

public static void main(String[] args) throws InterruptedException, IOException, UnrecoverableKeyException, KeyManagementException, KeyStoreException, NoSuchAlgorithmException, CertificateException, MqttClientException { String url = "tcp://localhost:1883"; MQTTClient clientSecure = new MQTTClient(url, false); int timeout = 500; String token = "e7ef0742d09d4de5a3687f0cfdf7f626"; String deviceTemplate = "TicketingApp"; String device = "MQTTApp01"; String ontology = "Ticket"; String subscription = "ticketStatus"; String queryValue = "DONE"; final ObjectMapper mapper = new ObjectMapper(); log.info("Using Ontology:" + ontology + " and instanceOntology"); clientSecure.connect(token, deviceTemplate, device, null, "", null); clientSecure.setTimeout(timeout); clientSecure.subscribeCommands(new SubscriptionListener() { @Override public void onMessageArrived(String message) { try { final JsonNode cmdMsg = mapper.readTree(message); generateLogMessage(clientSecure, timeout, cmdMsg); } catch (final IOException e) { log.error(e.getMessage()); } catch (final MqttClientException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } } }); final String subsId = clientSecure.subscribe(subscription, queryValue, new SubscriptionListener() { @Override public void onMessageArrived(String message) { try { final JsonNode cmdMsg = mapper.readTree(message); System.out.println(message); } catch (final IOException e) { log.error(e.getMessage()); } } }); clientSecure.unsubscribe(subsId); }

Es conveniente prestar atención a las siguientes líneas de código para entender el ejemplo:

  •  

  • (Línea 5) Aquí se elige la URL del entorno al cual se quiere conectar. (En este ejemplo estamos trabajando sobre local).

  • (Línea 12, 13) Aquí se definen los parámetros necesarios para que el cliente se suscriba. En este caso nos estamos suscribiendo a todas las instancias cuyo valor del campo “status“ sea igual a “DONE“.

Ten en cuenta que esto es un ejemplo sencillo, pero que puede suscribirse a campos de tipo objeto más complejos.

  • (Línea 18) Se realiza la conexión contra el iotbroker para obtener la sessionkey.

  • (Línea 38) Realizamos la suscripción.

Cada vez que se realiza una suscripción, la plataforma nos devuelve un 'subsId' único para cada suscripción. Es importante guardar este dato, ya que todas las notificaciones llevarán dentro del mensaje este identificador. Además es necesario para eliminar dicha suscripción.

  • (Línea 55) Se elimina la suscripción utilizando el subscriptionId obtenido en el paso de la suscripción.

Es importante saber que cuando la plataforma detecta que el cliente se ha desconectado elimina automáticamente la suscripción, por lo que es necesario que el cliente vuelva a suscribirse cuando rearranque.