Introduction
Onesait Platform's Dataflow module is a low-code tool for defining and executing data flows. In this article we will see how JUnit can be used to automate dataflow testing.
To test the pipelines we need an instance of Dataflow where to execute them. To keep this test completely independent of external resources, we use the TestContainers library to run a Dataflow instance in the test itself. It would be more efficient and faster when running the tests to have a dedicated instance with all the pipelines that need to be tested.
The purpose of this example is to show how this kind of tests can be performed, and therefore the pipeline we are going to test is very simple. Any other pipeline would be tested in a similar way.
The strategy followed in this example is to use the dataflow client library to manage the flows of an instance, execute a preview remotely and validate the values of the records in each of the stages. In other words, the same thing we would do manually and visually when using the preview of a pipeline, but automated.
Other types of tests could be performed, such as the complete execution of a pipeline and validating that the number of records in the outputs and the number and type of errors are as expected, but the information provided by this type of test is less accurate.
The complete code for this example is at github.
Configuración de la instancia de Dataflow
Como se ha comentado anteriormente en este ejemplo se está usando la librería TestContainers, que permite levantar y parar contenedores de forma automática en una prueba.
Para utilzarla con JUnit 5 necesita las siguientes dependencias:
<dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <version>1.18.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <version>1.18.3</version> <scope>test</scope> </dependency>
Configurar el contenedor de forma básica es muy sencillo, basta con anotar la clase del test con @Testcontainers
y crear un atributo con la anotación @Container
. Por ejemplo, para nuestro caso:
private static final String IMAGE = "registry.onesaitplatform.com/onesaitplatform/streamsets:4.1.0-overlord-323"; private static final int PORT = 18630; @Container public static GenericContainer<?> dataflow = new GenericContainer<>(IMAGE) .withExposedPorts(PORT) .withStartupTimeout(Duration.ofSeconds(120));
La librería Testcontainer automáticamente creará y destruirá un contenedor para cada test que se ejecute. Esto puede ralientizar la ejecución de los tests. Además cada nuevo contenedor se ejecutará sin pipelines ni librerías adicionales instaladas por lo que habrá que hacerlo en el propio proceso de test. Existen varias estrategias para solventar esto:
Por un lado se puede tener una imagen configurada con todos los pipelines y librerías requeridas instaladas. Esto es tan sencillo como crear una imagen a partir de un contenedor existente que sí se haya configurado con el comando
docker commit <container> <new_image_name>
.Por otro lado se puede tener una instancia del dataflow lista y configurada en un entorno para que se utilice en la ejecución de tests.
Test con JUnit
Creción del cliente para el Dataflow
Lo primero que hace falta para ejecutar los test es crear un instancia del cliente para el dataflow
ApiClient apiClient = new ApiClient(authType); apiClient.setUserAgent("SDC CLI"); apiClient.setBasePath(getUrl(port) + "/rest"); apiClient.setUsername(user); apiClient.setPassword(password); SimpleModule module = new SimpleModule(); module.addDeserializer(FieldJson.class, new FieldDeserializer()); apiClient.getJson().getMapper().registerModule(module);
Además de crear el cliente en el código se puede ver que se está registrando un Deserializador. Esto no es obligatorio. En este caso se utiliza para usar las clases Field del Dataflow para facilitar la consulta de los valores de los Records, pero se podrían comprobar utilizando el valor en Json original.
El código de creación de un cliente está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Importamos el pipeline a probar
Lo siguiente que hacemos es importar el pipeline a probar.
TypeRef<PipelineEnvelopeJson> returnType = new TypeRef<PipelineEnvelopeJson>() {}; PipelineEnvelopeJson pipelineEnvelopeJson = apiClient.getJson().deserialize(new File(getPipelinePath(filename)), returnType); StoreApi storeApi = new StoreApi(apiClient); PipelineEnvelopeJson importPipeline = storeApi.importPipeline( pipelineId, rev, false, false, false, false, pipelineEnvelopeJson);
Ver la sección Configurar Dataflow, para ver cómo configurar escenarios más complejos.
El código de importación de un pipeline está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Ejecutamos una preview del pipeline
Una vez tenemos el pipeline en la instancia del dataflow, vamos a ejectuar una preview del pipeline.
El siguiente fragmente de código hace exactamente eso. Ejecuta una preview, espera a que el resultado esté listo y obtiene los datos de la preview.
PreviewApi previewApi = new PreviewApi(apiClient); PreviewInfoJson previewOutput = previewApi.previewWithOverride( pipelineId, Collections.<StageOutputJson>emptyList(), rev, batchSize, batches, writeTarget, null, null); String previewerId = previewOutput.getPreviewerId(); PreviewStatus status = null; do { PreviewInfoJson previewStatus = previewApi.getPreviewStatus(pipelineId, previewerId); TimeUnit.SECONDS.sleep(5); StatusEnum statusEnum = previewStatus.getStatus(); status = PreviewStatus.valueOf(statusEnum.name()); } while (status.isActive()); assertTrue(PreviewStatus.FINISHED.equals(status)); PreviewOutputJson previewData = previewApi.getPreviewData(pipelineId, previewerId);
El código de ejecución de una preview de un pipeline está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Validación del resulado de la preview
Una vez tenemos la preview ejecutada podemos pasar a analizar los resultados y determinar si son los esperados. Básicamente esto cosisten en comprobar batch a batch si en las etapas que nos interese los records tienen los valores esperados. Es decir, que se obtienen los datos que se esperaban y que se hacen las transformaciones que se esperaban antes de enviar los datos al destino.
List<List<StageOutputJson>> batchesOutput = previewData.getBatchesOutput(); // This test only use one batch. List<StageOutputJson> stageOutputs = batchesOutput.get(0); for (StageOutputJson stageOutput : stageOutputs) { // Assert status of each stage for the selected batch. switch (stageOutput.getInstanceName()) { case "DevRawDataSource_01": Map<String, List<RecordJson>> outputs = stageOutput.getOutput(); for (List<RecordJson> records : outputs.values()) { for (RecordJson record : records) { // If needed, test record header values // Dummy test as example HeaderJson header = record.getHeader(); assertTrue(header.getSourceId().equals("rawData::0")); Field field = ApiUtils.getFieldFromRecord(record, apiClient.getJson()); Map<String, Field> root = field.getValueAsMap(); // We can test any data of the records, including the data type. assertTrue(root.get("value1").getType().equals(Field.Type.STRING)); assertTrue(root.get("value1").getValueAsString().equals("abc")); assertTrue(root.get("value2").getType().equals(Field.Type.STRING)); assertTrue(root.get("value2").getValueAsString().equals("xyz")); assertTrue(root.get("value3").getType().equals(Field.Type.STRING)); assertTrue(root.get("value3").getValueAsString().equals("lmn")); } } break; case "Trash_01": // trash does not have output break; default: // stage not tested break; } }
El ejemplo que hemos incluido sólo genera un batch de datos. En la mayoría de los casos con esto será sufiente. Si se requiren pruebas con varios batches, es recomendable usar un número manejable de los mismos para simplificar la validación de los resultados. Para cada batch que se obtenga habrá un conjunto de Records en cada una de las etapas que forman el pipeline. El test deberá comprobar que los valores se corresponden con lo que se espera en dicha etapa.
El pipeline de este ejemplo se corresponede con el de la siguiente imagen:
Los nombres DevRawDataSource_01
y Trash_01
usados en el bloque case de la captura de código anterior para determinar la etapa de ejecución, se pueden ver en la información de cada una de las etapas en el editor de pipelines: