Introducción
Para aumentar las capacidades del Dataflow es posible desarrollar etapas personalizadas que se pueden utilizar en los pipelines que se creen con el propio Dataflow del mismo modo que cualquiera de los componentes propios del Dataflow.
Este artículo describe cómo se pueden hacer test automáticos con JUnit para probar las esatapas personalizadas que se desarrollen.
Para mostrar ejemplos, usaremos el proyecto <github>. Dicho proyecto incluye ejemplos de pruebas para los componentes JDBC del propio Dataflow. De esta forma muestra ejemplos tanto para etapas de tipo origen, de tipo destino y de tipo procesadores.
Configuración del proyecto
El proyecto de ejemplo <github> usa Maven para la gestión de las dependencias. A continuación se describen las depedencias que se han utilizado.
Versiones utilizadas.
<properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <sdc.version>3.23.0</sdc.version> <junit.version>5.9.3</junit.version> <h2.version>2.2.220</h2.version> </properties>
Librería JUnit, en este caso estamos utilizando JUnit 5.
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency>
En este caso, dado que estamos probando la librería JDBC del dataflow, necesitamos declarar la libería que tiene los componentes que vamos a probar. En un proyecto de desarrollo de componentes esto no sería necesario ya que los test estarían en el propio proyecto.
<dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-stagesupport</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency>
En muchos casos necesitaremos librerías adicionales si queremos probar las conexiones externas. Esto dependerá del tipo de test que se quiera realizar, unitarios con mocks, de integración con los sistemas externos. En este caso lo vamos a hacer utilizando una base de datos para probar las conexiones directamente. Para ello utilizamos la base de datos en memoria H2.
<dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>${h2.version}</version> <scope>test</scope> </dependency>
Finalmente, es necesario declarar las dependencias propias del dataflow que se utilizarán para ejecutar las estapas a probar y poder comprobar los resultados.
<dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-api</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-sdk</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency>
Prueba de Orígen
En este ejemplo para probar el origen JDBC arrancamos una base de datos H2 y la poblamos con unas tablas para probar.
@BeforeAll public static void setUp() throws SQLException { final String sdcId = "testingID"; Utils.setSdcIdCallable(new Callable<String>() { @Override public String call() { return sdcId; } }); // Create a table in H2 and put some data in it for querying. connection = DriverManager.getConnection(h2ConnectionString, username, password); try (Statement statement = connection.createStatement()) { // Setup table statement.addBatch("CREATE SCHEMA IF NOT EXISTS TEST;"); statement.addBatch( "CREATE TABLE IF NOT EXISTS "+fullTableName+" " + "(p_id INT NOT NULL, first_name VARCHAR(255), last_name VARCHAR(255));" ); // Add some data statement.addBatch("INSERT INTO "+fullTableName+" VALUES (1, 'Adam', 'Kunicki')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (2, 'Jon', 'Natkins')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (3, 'Jon', 'Daulton')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (4, 'Girish', 'Pancha')"); statement.executeBatch(); } }
Del mismo modo, al finalizar el test limpiamos la base de datos.
@AfterAll public static void tearDown() throws SQLException { try (Statement statement = connection.createStatement()) { // Setup table statement.execute("DROP TABLE IF EXISTS "+fullTableName+";"); statement.execute("DROP SCHEMA IF EXISTS "+schema+";"); } // Last open connection terminates H2 connection.close(); }
Para realizar la prueba lo primero que hacemos es crear un componente de tipo origen JDBC
JdbcSource origin = new JdbcSource( true, query, initialOffset, "P_ID", false, "", 1000, JdbcRecordType.LIST_MAP, new CommonSourceConfigBean(queriesPerSecond, BATCH_SIZE, CLOB_SIZE, CLOB_SIZE), false, "", createConfigBean(h2ConnectionString, username, password), UnknownTypeAction.STOP_PIPELINE, queryInterval );
Los valores de inicialización dependen del tipo de componente, por lo que los utilizados en este ejemplo son los específicos del origen de JDBC.
Lo siguiente es crear un objeto SourceRunner que nos va a permitir ejecutar las “lecturas” del origen.
SourceRunner runner = new SourceRunner.Builder(JdbcDSource.class, origin) .addOutputLane("lane") .build(); runner.runInit();
Una vez hecho esto simulamos una secuencia de lecturas, y si queremos escrituras, de la base de datos con el origen. Además nos aseguramos de liberar los recursos del runner cuanto terminamos de ejecutar el test.
try { // Check that existing rows are loaded. StageRunner.Output output = runner.runProduce(null, 2); Map<String, List<Record>> recordMap = output.getRecords(); List<Record> parsedRecords = recordMap.get("lane"); assertEquals(2, parsedRecords.size()); assertEquals("2", output.getNewOffset()); // Check that the remaining rows in the initial cursor are read. output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(2, parsedRecords.size()); // Check that new rows are loaded. runInsertNewRows(); output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(2, parsedRecords.size()); assertEquals("10", output.getNewOffset()); // Check that older rows are not loaded. runInsertOldRows(); output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(0, parsedRecords.size()); } finally { runner.runDestroy(); }
No merece la pena detallar el test porque es específico del componente que estamos probando en el ejemplo. Lo importante es fijarse en cómo se usa el objeto de la clase SourceRunner (runner en el ejemplo) para ir produciendo registros. También es útil fijarse en cómo se pueden comprobar los valores de los datos obtenidos para poder comprobar que los valores obtenidos son los esperados.
El ejemplo completo se puede ver en <github>
Prueba de Destino
Para probar la escritura en el destino se ha usado una estrategia similar a la empleada para probar la lectura. Por lo que no vamos a repetir la parte de código similar de configurar la base de datos. Para ver el ejemplo completo se puede ver en <github>.
Las mayores diferencias es que se creará un objeto de tipo destino, JdbcTarget, en nuesto ejemplo.
Target target = new JdbcTarget( schema, tableName, fieldMappings, false, false, false, JdbcMultiRowRecordWriter.UNLIMITED_PARAMETERS, ChangeLogFormat.NONE, JDBCOperationType.INSERT, UnsupportedOperationAction.DISCARD, createConfigBean(h2ConnectionString, username, password), Collections.emptyList());
Se crea un TargetRunner para ejecutar la escritura. Además, para para escribir hay que crear los registros que formarían el batch de escritura. Finalmente, se usa el TargetRunner para escribir en el destino.
TargetRunner targetRunner = new TargetRunner.Builder(JdbcDTarget.class, target).build(); Record record = RecordCreator.create(); List<Field> fields = new ArrayList<>(); fields.add(Field.create(1)); fields.add(Field.create("Adam")); fields.add(Field.create("Kunicki")); fields.add(Field.createDatetime(new Instant().toDate())); record.set(Field.create(fields)); List<Record> singleRecord = Collections.unmodifiableList(Arrays.asList(record)); targetRunner.runInit(); targetRunner.runWrite(singleRecord);
Finalmente hay comprobar que se ha escrito en el destino. En este ejemplo comprobando si se ha escrito en la base de datos.
connection = DriverManager.getConnection(h2ConnectionString, username, password); try (Statement statement = connection.createStatement()) { ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM " + fullTableName); rs.next(); assertEquals(1, rs.getInt(1)); }
Prueba de un Processor
Este caso es muy similar al de los Orígenes y los Destinos. La mayor diferencia radica en que se utiliza un componente de tipo processor y un runner de tipo ProcessorRunner.
JdbcLookupDProcessor processor = createProcessor(); ProcessorRunner processorRunner = new ProcessorRunner.Builder(JdbcLookupDProcessor.class, processor) .addConfiguration("query", listQuery).addConfiguration("columnMappings", columnMappings) .addConfiguration("multipleValuesBehavior", MultipleValuesBehavior.FIRST_ONLY) .addConfiguration("missingValuesBehavior", MissingValuesBehavior.SEND_TO_ERROR) .addConfiguration("maxClobSize", 1000).addConfiguration("maxBlobSize", 1000) .addConfiguration("validateColumnMappings", true).addOutputLane("lane").build();
El resto de comprobaciones son similares a las de destino y origen. Se puede ver el ejemplo completo en <github>.