Exchanging data between RPC pipelines
RPC pipelines are a set of pipelines that send data from one pipeline to another without writing to an intermediary system. It consists of an origin pipeline and a destination pipeline. The origin pipeline uses an SDC RPC destination to write directly to an SDC RPC origin in the destination pipeline, as follows:
IngestionFlow
ValidationFlow
According to the first picture, we have an origin pipeline which reads a file from a SFTP connection, makes different field type conversions and sends the result to another pipeline through SDC RPC destination stage. In the second picture, the destination pipeline receives the data through SDC RPC origin stage and makes some validations to check different errors.
To configure the SDC RPC stages, in the destination pipeline we must choose a listening port and an SDC RPC ID:
So, the origin pipeline must be configured to send the records to this port and pass the same ID to establish the connection between the two pipelines:
According to the ValidationFlow, if there are no errors, the record will be ingested into the platform (TurbineTimeseries ontology). Otherwise, the record will be set as error record. We can configure the pipelines in order to send the error records to another pipeline which process them:
So, here we have a third pipeline to handle error records:
ErrorFlow
Whose SDC RPC origin stage is configured to listen in port 20001 and has the same ID than the "Write to Another Pipeline" configuration shown above. This pipeline checks the error origin and generates the corresponding error message to insert it into the "ErrorTimeseries" ontology.
We are going to see the result of running these 3 pipelines with a testing file that contains errors. We must run the pipelines in the properly order, i.e, from the last one to the first one (in this example, the order will be ErrorFlow → ValidationFlow → IngestionFlow). The result is as follows:
1. IngestionFlow:
The pipeline has read 432 data in the testing file and 1 of them has been marked as an error because the "String to Float" stage can't be able to convert an empty string to float. The 431 remaining data have been sent to ValidationFlow dataflow.
2. ValidationFlow:
The pipeline has received the 431 records from the IngestionFlow dataflow and 2 of them have been marked as errors because the "Check TurbineId and UTCTime" stage has noticed that one mandatory field is missing and the "Check negative values" has noticed that one value is negative. The 429 remaining data have been inserted into the TurbineTimeseries ontology.
3. ErrorFlow:
The pipeline has received the 3 error records found in IngestionFlow and ValidationFlow dataflows. These records have been introduced an error message depending on their origin and have been inserted into the ErrorTimeseries ontology.
To see all has worked correctly, we are going to check the ontologies to see the data inserted:
TurbineTimeseries
ErrorTimeseries