...
The complete code for this example is at github.
...
Dataflow
...
Como se ha comentado anteriormente en este ejemplo se está usando la librería TestContainers, que permite levantar y parar contenedores de forma automática en una prueba.
...
Instance Configuration
As mentioned earlier, this example uses the TestContainers library, which allows for automatically starting and stopping containers in a test.
To use it with JUnit 5, you need the following dependencies:
Code Block |
---|
<dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers</artifactId> <version>1.18.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <version>1.18.3</version> <scope>test</scope> </dependency> |
Configurar el contenedor de forma básica es muy sencillo, basta con anotar la clase del test con @Testcontainers
y crear un atributo con la anotación @Container
. Por ejemplo, para nuestro casoConfiguring the container in a basic way is very simple; you just need to annotate the test class with @Testcontainers
and create an attribute with the @Container
. annotation. For example, in our case:
Code Block | ||
---|---|---|
| ||
private static final String IMAGE = "registry.onesaitplatform.com/onesaitplatform/streamsets:4.1.0-overlord-323"; private static final int PORT = 18630; @Container public static GenericContainer<?> dataflow = new GenericContainer<>(IMAGE) .withExposedPorts(PORT) .withStartupTimeout(Duration.ofSeconds(120)); |
La librería Testcontainer automáticamente creará y destruirá un contenedor para cada test que se ejecute. Esto puede ralientizar la ejecución de los tests. Además cada nuevo contenedor se ejecutará sin pipelines ni librerías adicionales instaladas por lo que habrá que hacerlo en el propio proceso de test. Existen varias estrategias para solventar esto:
...
The TestContainers library will automatically create and destroy a container for each test that runs. This can slow down the execution of tests. Additionally, each new container will run without any pre-installed pipelines or additional libraries, so these will need to be installed in the test process itself. There are several strategies to address this:
On one hand, you can have an image configured with all the required pipelines and libraries installed. This is as simple as creating an image from an existing container that has been configured using the command
docker commit <container> <new_image_name>
.Por otro lado se puede tener una instancia del dataflow lista y configurada en un entorno para que se utilice en la ejecución de tests.
Test con JUnit
Creción del cliente para el Dataflow
...
On the other hand, you can have a ready and configured Dataflow instance in an environment to be used during test execution.
Testing with JUnit
Creating a Dataflow Client
The first step needed to run the tests is to create an instance of the Dataflow client.
Code Block |
---|
ApiClient apiClient = new ApiClient(authType); apiClient.setUserAgent("SDC CLI"); apiClient.setBasePath(getUrl(port) + "/rest"); apiClient.setUsername(user); apiClient.setPassword(password); SimpleModule module = new SimpleModule(); module.addDeserializer(FieldJson.class, new FieldDeserializer()); apiClient.getJson().getMapper().registerModule(module); |
Además de crear el cliente en el código se puede ver que se está registrando un Deserializador. Esto no es obligatorio. En este caso se utiliza para usar las clases Field del Dataflow para facilitar la consulta de los valores de los Records, pero se podrían comprobar utilizando el valor en Json original.
El código de creación de un cliente está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Importamos el pipeline a probar
Lo siguiente que hacemos es importar el pipeline a probarIn addition to creating the client, you can see in the code that a Deserializer is being registered. This is not mandatory. In this case, it is used to leverage the Dataflow Field classes to facilitate querying the values of the records, but the values could also be checked using the original JSON format.
The code for creating a client is encapsulated in a utility class so that it can be reused across multiple tests. github.
Import the pipeline to be tested.
The next step will be to import the pipeline to be tested.
Code Block |
---|
TypeRef<PipelineEnvelopeJson> returnType = new TypeRef<PipelineEnvelopeJson>() {}; PipelineEnvelopeJson pipelineEnvelopeJson = apiClient.getJson().deserialize(new File(getPipelinePath(filename)), returnType); StoreApi storeApi = new StoreApi(apiClient); PipelineEnvelopeJson importPipeline = storeApi.importPipeline( pipelineId, rev, false, false, false, false, pipelineEnvelopeJson); |
Ver la sección Configurar Dataflow, para ver cómo configurar escenarios más complejos.
El código de importación de un pipeline está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Ejecutamos una preview del pipeline
Una vez tenemos el pipeline en la instancia del dataflow, vamos a ejectuar una preview del pipeline.
El siguiente fragmente de código hace exactamente eso. Ejecuta una preview, espera a que el resultado esté listo y obtiene los datos de la preview.
See Configuring Dataflow to learn how to set up more complex scenarios.
The code for creating a pipeline is encapsulated in a utility class so that it can be reused across multiple tests. github.
Run a pipeline preview.
Once the pipeline is in the Dataflow instance, we will run a preview.
The following code snippet does exactly that. It runs a preview, waits for the result to be ready, and retrieves the preview data.
Code Block |
---|
PreviewApi previewApi = new PreviewApi(apiClient);
PreviewInfoJson previewOutput = previewApi.previewWithOverride(
pipelineId,
Collections.<StageOutputJson>emptyList(),
rev,
batchSize,
batches,
writeTarget,
null,
null);
String previewerId = previewOutput.getPreviewerId();
PreviewStatus status = null;
do {
PreviewInfoJson previewStatus = previewApi.getPreviewStatus(pipelineId, previewerId);
TimeUnit.SECONDS.sleep(5);
StatusEnum statusEnum = previewStatus.getStatus();
status = PreviewStatus.valueOf(statusEnum.name());
} while (status.isActive());
assertTrue(PreviewStatus.FINISHED.equals(status));
PreviewOutputJson previewData = previewApi.getPreviewData(pipelineId, previewerId); |
El código de ejecución de una preview de un pipeline está encapsulado en una clase de utilidad para que pueda reutilizarse en varios tests github.
Validación del resulado de la preview
...
The code for running a pipeline preview is encapsulated in a utility class so that it can be reused across multiple tests. github.
Validating the Preview Result
Once the preview has been run, we can analyze the results to determine if they are as expected.
Essentially, this involves checking batch by batch if the records in the stages of interest have the expected values. In other words, we verify that the expected data is obtained and that the expected transformations are performed before sending to destination.
Code Block |
---|
List<List<StageOutputJson>> batchesOutput = previewData.getBatchesOutput(); // This test only use one batch. List<StageOutputJson> stageOutputs = batchesOutput.get(0); for (StageOutputJson stageOutput : stageOutputs) { // Assert status of each stage for the selected batch. switch (stageOutput.getInstanceName()) { case "DevRawDataSource_01": Map<String, List<RecordJson>> outputs = stageOutput.getOutput(); for (List<RecordJson> records : outputs.values()) { for (RecordJson record : records) { // If needed, test record header values // Dummy test as example HeaderJson header = record.getHeader(); assertTrue(header.getSourceId().equals("rawData::0")); Field field = ApiUtils.getFieldFromRecord(record, apiClient.getJson()); Map<String, Field> root = field.getValueAsMap(); // We can test any data of the records, including the data type. assertTrue(root.get("value1").getType().equals(Field.Type.STRING)); assertTrue(root.get("value1").getValueAsString().equals("abc")); assertTrue(root.get("value2").getType().equals(Field.Type.STRING)); assertTrue(root.get("value2").getValueAsString().equals("xyz")); assertTrue(root.get("value3").getType().equals(Field.Type.STRING)); assertTrue(root.get("value3").getValueAsString().equals("lmn")); } } break; case "Trash_01": // trash does not have output break; default: // stage not tested break; } } |
El ejemplo que hemos incluido sólo genera un batch de datos. En la mayoría de los casos con esto será sufiente. Si se requiren pruebas con varios batches, es recomendable usar un número manejable de los mismos para simplificar la validación de los resultados. Para cada batch que se obtenga habrá un conjunto de Records en cada una de las etapas que forman el pipeline. El test deberá comprobar que los valores se corresponden con lo que se espera en dicha etapa.
El pipeline de este ejemplo se corresponede con el de la siguiente imagen:
...
Los nombres DevRawDataSource_01
y Trash_01
usados en el bloque case de la captura de código anterior para determinar la etapa de ejecución, se pueden ver en la información de cada una de las etapas en el editor de pipelinesThe example we have included only generates one batch of data. In most cases, this will be sufficient. If tests with multiple batches are required, it is advisable to use a manageable number of them to simplify the validation of results. For each batch obtained, there will be a set of records at each stage of the pipeline. The test should verify that the values correspond to what is expected at each stage.
As we can see in the following example, the pipeline shows a visualization of the data in the output of each stage.
...
The names DevRawDataSource_01
and Trash_01
used in the case block of the previous code snippet to determine the execution stage can be seen in the information of each stage in the pipeline editor:
...