How to suscribe to the Digital Broker using the Java Client API?

This functionality will be available from version 2.0.0 onward.

Introduction

As you must know by now, we are always working with ontologies in Onesait Platform. Ontologies are the entities that the system manages and shares with other systems.

For more information on ontologies see: https://onesaitplatform.atlassian.net/wiki/spaces/PT/pages/112820241 .

This tutorial will explain how you can subscribe to an ontology according to the value of some of its fields. It is important to have a clear idea of the data flow. To do so, follow the next steps:

  1. Creating the subscription in the control panel: you will define here the ontology and the field you want to subscribe to.

  2. Subscription of a Java client: the client will subscribe to the subscription created previously, but will do it for a specific value of the field (Temember that this field is defined in the subscription you have created in the control panel).

  3. Data notification: once the client is subscribed, every time a data is inserted in the subscription ontology, a check will be made to verify whether the inserted data matches the value to which the client is subscribed or not. If it does, then the newly defined data will be notified in the subscription created in the control panel.

Creating a subscription from the control panel

The first thing to do is to create the subscription in the controlpanel. To do that, go to the menu CLIENTS & DIGITAL TWINS > My Subscriptions Definitions

You will be redirected to the list of subscriptions. To create a new subscription, click on "CREATE".

A form will appear with the following fields:

  • Identification: name of the subscription. This identifier is the one that you will use later to subscribe from the clients.

  • Description: short description of the subscription.

  • Select Ontology: ontology to which you want to subscribe. Once the ontology is selected, an instance of this ontology will appear at the bottom of the form, to help you choose the subscription configuration.

  • Query field: ontology field to which you want to subscribe. Use JSONPath to select this field. In this example, the field 'status' has been selected.

  • Projection: data to be notified to the subscribed clients. Use JSONPath for the selection of this data. Bear in mind that, if you want to receive the whole ontology instance, you have to put the $ character. In this example, the field 'file' has been selected.

  • Query operator: this is a drop-down menu where you can choose the type of operation you want to check when an instance is inserted.

Once the form is filled in, click on the "NEW" button that will redirect you to the list of subscriptions, where you can see the one you have just created:

Subscription of a Java client

This functionality is available from version 1.4.2 of the client library onwards.

Next we'll explain how to subscribe to the subscription created in the previous section, through a Java client. Firstly we explain how to do it via REST and then via MQTT.

The methods "subscribe" and "unsubscribe" have been added to the library to manage the subscriptions.

Subscription via REST

To subscribe to a subscription via REST, you need to know the following parameters:

  • subscription: identifier of the subscription created in the control panel.

  • queryValue: the value you want to subscribe to. Bear in mind that it must match the field selected in the subscription.

  • callback: the endpoint to which you want to send the notifications.

Let's see a practical example. The following example shows the "main" method of an application:

public static void main(String[] args) throws SSAPConnectionException { final String token = "e7ef0742d09d4de5a3687f0cfdf7f626"; final String deviceTemplate = "TicketingApp"; final String device = "TicketMachine1"; final String ontology = "Ticket"; final String subscription = "ticketStatus"; final String queryValue = "DONE"; final String callback = "http://localhost:10000/turbine/rest/notify"; final ObjectMapper mapper = new ObjectMapper(); RestClient client = null; try { log.info("Now we are going to execute the example"); client = new RestClient("http://localhost:19000/iot-broker"); log.info("1. Connecting to {}", client.getRestServer() + " with token:" + token + " and device:" + deviceTemplate + ":" + device); log.info("(Rest client will accept all SSL certificates)"); client.connect(token, deviceTemplate, device, true); log.info("...Connected to {}", client.getRestServer()); log.info("4. Subscribing to subscription:" + subscription); JsonNode subscriptionId = client.subscribe(subscription, queryValue, callback); log.info("...Subscribed with id:" + subscriptionId.asText()); String instance = "{\"Ticket\":{\"identification\":\"\",\"status\":\"DONE\",\"email\":\"iex@email.com\",\"name\":\"Alberto\",\"response_via\":\"email\",\"file\":{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}},\"coordinates\":{\"coordinates\":{\"latitude\":45.456,\"longitude\":-41.283},\"type\":\"Point\"}}}"; log.info("5. Inserting one instance:" + instance); final String idInsert = client.insert(ontology, mapper.readTree(instance).toString()); log.info("...Inserted with id {}" + idInsert); log.info("8. Unsubscribe id {}", subscriptionId.asText()); client.unsubscribe(subscriptionId.get("subscriptionId").asText()); log.info("...Unsubscribed"); log.info("7. Disconnecting"); client.disconnect(); log.info("...Disconnected"); } catch (final Exception e) { log.error("Error in process", e); } }

Pay attention to the following lines of code to understand the example:

  • (Line 7, 8 and 9) Here you define the parameters necessary for the customer to subscribe. In this case, we are subscribing to all the instances whose value in the "status" field equals "DONE".

  • (Line 15) Here you choose the URL of the environment you want to connect to. (In this example, we are working locally).

  • (Line 19) The connection is made against the iotbroker to obtain the sessionkey.

  • (Line 23) The subscription is made.

  • (Line 28) An insertion is made in the ontology where the value "status" equals "DONE".

  • (Line 32) The subscription is removed using the subscriptionId obtained in the subscription step (line 23).

  • (Line 36) Disconnect from the client.

When the subscribed client is notified, it will receive a message like this:

{ "messageId": null, "sessionKey": "4a786ccb-09cb-4839-a7e1-5c7f9becb620", "direction": "RESPONSE", "messageType": "INDICATION", "transactionId": null, "body": { "@type": "SSAPBodyIndicationMessage", "ontology": null, "subscriptionId": "0df86f69-d252-48d3-ad4a-9ba09308f40e", "data": "[{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}}]" } }

Subscription via MQTT

To subscribe to a subscription via MQTT, you need to know the following parameters:

  • subscription: identifier of the subscription created in the control panel.

  • queryValue: the value you want to subscribe to. Bear in mind that it must match the field selected in the subscription.

Let's see a practical example. The following example shows the "main" method of an application:

public static void main(String[] args) throws InterruptedException, IOException, UnrecoverableKeyException, KeyManagementException, KeyStoreException, NoSuchAlgorithmException, CertificateException, MqttClientException { String url = "tcp://localhost:1883"; MQTTClient clientSecure = new MQTTClient(url, false); int timeout = 500; String token = "e7ef0742d09d4de5a3687f0cfdf7f626"; String deviceTemplate = "TicketingApp"; String device = "MQTTApp01"; String ontology = "Ticket"; String subscription = "ticketStatus"; String queryValue = "DONE"; final ObjectMapper mapper = new ObjectMapper(); log.info("Using Ontology:" + ontology + " and instanceOntology"); clientSecure.connect(token, deviceTemplate, device, null, "", null); clientSecure.setTimeout(timeout); clientSecure.subscribeCommands(new SubscriptionListener() { @Override public void onMessageArrived(String message) { try { final JsonNode cmdMsg = mapper.readTree(message); generateLogMessage(clientSecure, timeout, cmdMsg); } catch (final IOException e) { log.error(e.getMessage()); } catch (final MqttClientException e) { // TODO Auto-generated catch block log.error(e.getMessage()); } } }); final String subsId = clientSecure.subscribe(subscription, queryValue, new SubscriptionListener() { @Override public void onMessageArrived(String message) { try { final JsonNode cmdMsg = mapper.readTree(message); System.out.println(message); } catch (final IOException e) { log.error(e.getMessage()); } } }); clientSecure.unsubscribe(subsId); }

Pay attention to the following lines of code to understand the example:

  • (Line 5) Here you choose the URL of the environment you want to connect to. (In this example, we are working locally).

  • (Line 12, 13) Here you define the parameters necessary for the customer to subscribe. In this case, we are subscribing to all the instances whose value in the "status" field equals "DONE".

  • (Line 18) The connection is made against the iotbroker to obtain the sessionkey.

  • (Line 38) The subscription is made.

  • (Line 55) The subscription is removed using the subscriptionId obtained in the subscription step (line 38).