Distributed Transaction Service: How to use it in Platform
This functionality is available from version 1.6.0.
Introduction
We define the concept of transaction as a set of operations on a database that must be executed as a unit, since there are times when it is necessary that several operations on a database be carried out in bulk, that is, to execute or all or none.
Transactional support allows applications allowed to:
All operations to be performed are executed.
When one of the operations of a transaction fails, all other operations performed so far are reversed.
Integration with Digital Broker
Three new operations have been added:
In all of them it is necessary to add in the request the “Authorization” header with the sessionKey obtained previously with the “Join” method.
Start: GET operation that initiates a transaction. As a result of this operation. you will obtain the transactionId that will serve you as the unique identifier of the transaction.
Commit: GET operation that commits the transaction, that is, executes all the operations assigned to the transaction. In this case it is necessary to pass the transactionId, of the transaction to be executed, and the lockOntologies parameter, which allows you to decide whether you want to block the affected ontologies in the transaction or not.
Rollbak: Deletes a transaction and reverts all operations already executed. This service receives the transactionId as its argument.
In addition, all INSERT, UPDATE, DELETE methods have been extended to support a new optional header, TransactionId , so that if this header is added, the operation will be added to the transaction, while, if omitted, the operation will be performed as normal.
Integration with Java Client
For now, only transactions for REST operations are supported. In the future MQTT protocol will also be supported. We recommend reading the tutorial https://onesaitplatform.atlassian.net/wiki/spaces/OP/pages/14221371 to understand how this library works and how to configure it.
The Java client library has been extended to support platform transactions.
For this, a Transaction class has been created. This class has the INSERT, UPDATE and DELETE methods and also adds the following methods:
ConfigureConnection: is in charge of opening the connection with the platform and receives as its input parameters:
ConnectionType: this is an Enum that can be either REST or MQTT (Remember that for now only REST is supported).
Properties: object that stores the parameters necessary to establish the connection. To make it as standard as possible, an Enum RestProperty class has also been defined with the list of possible parameters required to establish the connection.
Start: creates a new transaction and receive as input parameters:
Device: Device name as defined in the platform.
Token: a token associated with the Device defined in the platform.
DeviceTemplate: identifier chosen by the user to identify the connection.
Commit: performs the operations associated with a transaction and receives as its input parameter:
LockOntologies: Boolean that indicates if you want the ontologies to be blocked or not.
Below is a practical example of use:
At least version 1.3.8 of the library must be used.
public static void main(String[] args) throws SSAPConnectionException {
final String token = "e7ef0742d09d4de5a3687f0cfdf7f626";
final String deviceTemplate = "TicketingApp";
final String device = "TicketMachine1";
final String ontology = "Ticket";
final ObjectMapper mapper = new ObjectMapper();
Properties prop = new Properties();
prop.put(Transaction.CONNECTION_TYPE, Transaction.ConnectionType.REST.name());
prop.put(Transaction.DIGITAL_BROKER_REST_ENDPOINT, "http://localhost:19000/iot-broker");
Transaction tx = new Transaction();
tx.configureConnection(prop);
try {
log.info("Example with transaction:");
log.info("1. Starting transaction.");
String transactionId = tx.start(token, deviceTemplate, device);
log.info("...Started transaction. Transaction id: {}", transactionId);
String instance = "{\"Ticket\":{\"identification\":\"WithoutTransaction\",\"status\":\"DONE\",\"email\":\"iex@email.com\",\"name\":\"Alberto\",\"response_via\":\"email\",\"file\":{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}},\"coordinates\":{\"coordinates\":{\"latitude\":45.456,\"longitude\":-41.283},\"type\":\"Point\"}}}";
log.info("2. Inserting one instance with transaction id {}: {}", transactionId, instance);
String idInsertTx = tx.insert(ontology, mapper.readTree(instance).toString());
log.info("...Inserted instance in transaction {} with id {}", transactionId, idInsertTx);
instance = "{\"Ticket\":{\"identification\":\"ticket_updated\",\"status\":\"DONE\",\"email\":\"iex@email.com\",\"name\":\"Alberto\",\"response_via\":\"email\",\"file\":{\"data\":\"\",\"media\":{\"name\":\"\",\"storageArea\":\"SERIALIZED\",\"binaryEncoding\":\"Base64\",\"mime\":\"application/pdf\"}},\"coordinates\":{\"coordinates\":{\"latitude\":45.456,\"longitude\":-41.283},\"type\":\"Point\"}}}";
log.info("3. Inserting one instance with transaction id {}: {}", transactionId, instance);
idInsertTx = tx.insert(ontology, mapper.readTree(instance).toString());
log.info("...Inserted instance in transaction {} with id {}", transactionId, idInsertTx);
log.info("4. Commit transaction with transaction id {}", transactionId);
tx.commit(true);
log.info("..Transaction commited with id {}", transactionId);
log.info("Example with transaction OK!!!");
} catch (final Exception e) {
log.error("Error in process", e);
tx.rollback();
}
}It is important to look at some details:
(Line 8) We add the CONNECTION_TYPE parameter to the Properties object. In this case we will make the connection via REST so we will also have to add the DIGITAL_BROKER_REST_ENDPOINT parameter.
(Line 11) Opening connection with the platform.
(Line 16) Here the transaction is created. The start method returns the transactionId but this parameter is merely informative, because the library internally handles the transaction. That is, all operations assigned to the same Transaction object will belong to the same transaction.
(Lines 21 and 26) Two insert operations are added to the transaction.
(Line 30) The transaction’s commit is executed.
(Line 37) In the case of any of the operations throw an exception, the rollback operation is thrown
Integration with IoTClient4SpringBoot client
It is recommended to read the tutorial https://onesaitplatform.atlassian.net/wiki/spaces/DOCT/pages/2220787914 for a total understanding of this library.
The IoTClient4SpringBoot library has also been extended to support operations within a transaction.
For this, a new @IoTBrokerTransaction tag has been created that will be assigned at the method level, so that when a method marked with this tag is executed, the following flow will be followed:
Upon entering the method a new transaction will open.
All INSERT, UPDATE and DELETE operations that are defined within that method will be assigned to the same transaction.
When exiting the method:
If everything went well, the transaction commit is performed.
If an exception is thrown, rollback of all operations is performed and the transaction is eliminated.
In the event that more than one IoTBrokerTransaction tag is found in the same application flow, the first in the first method will be taken as valid, the subsequent tags being ignored.
Below is a practical example of how to use this functionality:
At least version 1.3.8 of the library must be used.
@IoTBrokerTransaction(lockOntologies = true)
public void insertInstances() throws Exception {
log.info("Ticket is going to be inserted.");
CitizenTicket ticket = new CitizenTicket();
ticket.setEmail("onesaitplatform@indra.es");
ticket.setStatus("PENDING");
ticket.setName("Onesait Platform");
ticket.setIdentification("Ticket creado desde el check...");
CitizenTicketOntology ticketOnt = new CitizenTicketOntology();
ticketOnt.setCitizenTicket(ticket);
String result = ticketRepository.insertTicket(ticketOnt);
log.info("Result insert 1: {}", result);
}Bear in mind that:
(Line 1) the @IoTBrokerTransaction tag is added and the lockOntologies = true parameter is added. This parameter is used when doing the transaction’s commit and indicates whether the ontologies are going to be blocked during the transaction or not. If that this parameter is not indicated, the default value will be false.
(Line 13) When making an insertion, it is done in the same way that the inserts have been made outside the transactions. Internally, the library is responsible for adding this operation to the transaction because it is within a method with the @IoTBrokerTransaction tag.