Los pipelines RPC son un conjunto de pipelines que envían datos de un pipeline a otro sin escribir en un sistema intermediario. Consiste en un pipeline de origen y un pipeline de destino. El pipeline de origen utiliza un destino SDC RPC para escribir directamente a un origen SDC RPC en el pipeline de destino, como sigue:
...
IngestionFlow (flujo de ingesta)
...
ValidationFlow (flujo de validación)
De acuredo acuerdo con la primera imagen, tenemos un pipeline de origen que lee un fichero desde una conexión SFTP, realiza diferentes conversiones de tipo de campo y envía el resultado a otro pipeline a través de la etapa de destino SDC RPC. En la segunda imagen, el pipeline de destino recibe los datos a través de la etapa de origen SDC RPC y realiza algunas validaciones para comprobar diferentes errores.
Para configurar las etapas SDC RPC, en el pipeline de destino debemos elegir un puerto de escucha y un ID SDC RPC:
...
Así, el pipeline de origen debe estar configurado para enviar los registros a este puerto y pasar el mismo ID para establecer la conexión entre los dos pipelines:
...
Según el ValidationFlow, si no hay errores, el registro será ingestado en la plataforma (ontología TurbineTimeseries). En caso contrario, el registro se establecerá como registro erróneos. Podemos configurar los pipelines para enviar los registros erróneos registros erróneos a otro pipeline que los procese:
...
Así que aquí tenemos un tercer pipeline para gestionar los registros erróneos:
...
ErrorFlow (flujo de error)
Cuya etapa de origen SDC RPC está configurada para escuchar en el puerto 20001 y tiene el mismo ID que la configuración de "Escribir a Otro Pipeline" mostrada anteriormente. Este pipeline comprueba el origen del error y genera el correspondiente mensaje de error para insertarlo en la ontología "ErrorTimeseries".
Vamos a ver el resultado de ejecutar estos 3 pipelines con un fichero de pruebas que contiene errores. Debemos ejecutar los pipelines en el orden adecuado, es decir, del último al primero (en este ejemplo, el orden será ErrorFlow → ValidationFlow → IngestionFlow). El resultado es el siguiente:
1.
...
Ingestion Flow (Flujo de ingesta):
...
El pipeline ha leído 432 datos en el fichero de pruebas y uno (1) de ellos se ha marcado como error porque la fase "String to Float" (pasar de cadena a coma flotante) no puede convertir una cadena vacía en coma flotante. Los 431 datos restantes han sido enviados al flujo de datos ValidationFlow.
2.
...
Validation Flow (Flujo de validación):
...
El pipeline ha recibido los 431 registros del dataflow IngestionFlow y dos (2) de ellos han sido marcados como errores porque la etapa "Check TurbineId and UTCTime" (comprobar identificador de turbina y hora UTC) ha notado que falta un campo obligatorio y la "Check negative values" (comprobar valores negativos) ha notado que un valor es negativo. Los 429 datos restantes se han insertado en la ontología TurbineTimeseries.
3.
...
Error Flow (Flujo de error):
...
El pipeline ha recibido los tres (1+2=3) registros de error encontrados en los flujos de datos IngestionFlow y ValidationFlow. A estos registros se les ha introducido un mensaje de error dependiendo de su origen y se han insertado en la ontología ErrorTimeseries.
Para ver que todo ha funcionado correctamente, vamos a comprobar las ontologías para ver los datos insertados:
...
TurbineTimeseries
...
...
ErrorTimeseries