...
To show examples, we will use a project whose full code can be found on github. This project includes test examples for the JDBC components of Dataflow itself. In this way it shows examples for both source type, target type and processor type stages.
Configuración del proyecto
El proyecto de ejemplo (github) usa Maven para la gestión de las dependencias. A continuación se describen las depedencias que se han utilizado.
...
Project Configuration
The example project on github, uses Maven for dependency management. The dependencies used are described below.
Versions used:
Code Block |
---|
<properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <sdc.version>3.23.0</sdc.version> <junit.version>5.9.3</junit.version> <h2.version>2.2.220</h2.version> </properties> |
Librería JUnit , en este caso estamos utilizando library, in this case, using JUnit 5.
Code Block |
---|
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> |
En este caso, dado que estamos probando la librería JDBC del dataflow, necesitamos declarar la libería que tiene los componentes que vamos a probar. En un proyecto de desarrollo de componentes esto no sería necesario ya que los test estarían en el propio proyectoIn this case, since we are testing the Dataflow JDBC library, we need to declare the library that contains the components we are going to test. In a component development project, this declaration would not be necessary as the tests would be part of the project itself.
Code Block |
---|
<dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-stagesupport</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency> |
En muchos casos necesitaremos librerías adicionales si queremos probar las conexiones externas. Esto dependerá del tipo de test que se quiera realizar, unitarios con mocks, de integración con los sistemas externos. En este caso lo vamos a hacer utilizando una base de datos para probar las conexiones directamente. Para ello utilizamos la base de datos en memoria H2In many cases, we will need additional libraries to test external connections. The type of test—whether unit tests with mocks or integration tests with external systems—will determine this need. In this example, we will use a database to test the connections directly. For this purpose, we use the H2 in-memory database.
Code Block |
---|
<dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>${h2.version}</version> <scope>test</scope> </dependency> |
Finalmente, es necesario declarar las dependencias propias del dataflow que se utilizarán para ejecutar las estapas a probar y poder comprobar los resultadosFinally, it is necessary to declare the specific Dataflow dependencies that will be used to execute the stages under test and to verify the results.
Code Block | ||
---|---|---|
| ||
<dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-api</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>com.streamsets</groupId> <artifactId>streamsets-datacollector-sdk</artifactId> <version>${sdc.version}</version> <scope>test</scope> </dependency> |
Prueba de Orígen
...
Source Test
In this example, to test the JDBC source, we start an H2 database and populate it with some test tables.
Code Block | ||
---|---|---|
| ||
@BeforeAll public static void setUp() throws SQLException { final String sdcId = "testingID"; Utils.setSdcIdCallable(new Callable<String>() { @Override public String call() { return sdcId; } }); // Create a table in H2 and put some data in it for querying. connection = DriverManager.getConnection(h2ConnectionString, username, password); try (Statement statement = connection.createStatement()) { // Setup table statement.addBatch("CREATE SCHEMA IF NOT EXISTS TEST;"); statement.addBatch( "CREATE TABLE IF NOT EXISTS "+fullTableName+" " + "(p_id INT NOT NULL, first_name VARCHAR(255), last_name VARCHAR(255));" ); // Add some data statement.addBatch("INSERT INTO "+fullTableName+" VALUES (1, 'Adam', 'Kunicki')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (2, 'Jon', 'Natkins')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (3, 'Jon', 'Daulton')"); statement.addBatch("INSERT INTO "+fullTableName+" VALUES (4, 'Girish', 'Pancha')"); statement.executeBatch(); } } |
Del mismo modo, al finalizar el test limpiamos la base de datosSimilarly, at the end of the test, we clean up the database.
Code Block | ||
---|---|---|
| ||
@AfterAll public static void tearDown() throws SQLException { try (Statement statement = connection.createStatement()) { // Setup table statement.execute("DROP TABLE IF EXISTS "+fullTableName+";"); statement.execute("DROP SCHEMA IF EXISTS "+schema+";"); } // Last open connection terminates H2 connection.close(); } |
Para realizar la prueba lo primero que hacemos es crear un componente de tipo origen JDBCTo conduct the test, the first thing we do is create a JDBC source component.
Code Block | ||
---|---|---|
| ||
JdbcSource origin = new JdbcSource( true, query, initialOffset, "P_ID", false, "", 1000, JdbcRecordType.LIST_MAP, new CommonSourceConfigBean(queriesPerSecond, BATCH_SIZE, CLOB_SIZE, CLOB_SIZE), false, "", createConfigBean(h2ConnectionString, username, password), UnknownTypeAction.STOP_PIPELINE, queryInterval ); |
Los valores de inicialización dependen del tipo de componente, por lo que los utilizados en este ejemplo son los específicos del origen de JDBC.
Lo siguiente es crear un objeto SourceRunner que nos va a permitir ejecutar las “lecturas” del origenThe initialization values depend on the type of component, so those used in this example are specific to the JDBC source.
Next, we create a SourceRunner
object, which will allow us to execute the "reads" from the source.
Code Block |
---|
SourceRunner runner = new SourceRunner.Builder(JdbcDSource.class, origin) .addOutputLane("lane") .build(); runner.runInit(); |
Una vez hecho esto simulamos una secuencia de lecturas, y si queremos escrituras, de la base de datos con el origen. Además nos aseguramos de liberar los recursos del runner cuanto terminamos de ejecutar el testAfter this, we simulate a sequence of reads (and writes, if needed) from the database using the source. Additionally, we ensure to release the runner's resources once the test execution is complete.
Code Block |
---|
try { // Check that existing rows are loaded. StageRunner.Output output = runner.runProduce(null, 2); Map<String, List<Record>> recordMap = output.getRecords(); List<Record> parsedRecords = recordMap.get("lane"); assertEquals(2, parsedRecords.size()); assertEquals("2", output.getNewOffset()); // Check that the remaining rows in the initial cursor are read. output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(2, parsedRecords.size()); // Check that new rows are loaded. runInsertNewRows(); output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(2, parsedRecords.size()); assertEquals("10", output.getNewOffset()); // Check that older rows are not loaded. runInsertOldRows(); output = runner.runProduce(output.getNewOffset(), 100); parsedRecords = output.getRecords().get("lane"); assertEquals(0, parsedRecords.size()); } finally { runner.runDestroy(); } |
No merece la pena detallar el test porque es específico del componente que estamos probando en el ejemplo. Lo importante es fijarse en cómo se usa el objeto de la clase SourceRunner (runner en el ejemplo) para ir produciendo registros. También es útil fijarse en cómo se pueden comprobar los valores de los datos obtenidos para poder comprobar que los valores obtenidos son los esperados.
El ejemplo completo se puede ver en github.
Prueba de Destino
Para probar la escritura en el destino se ha usado una estrategia similar a la empleada para probar la lectura. Por lo que no vamos a repetir la parte de código similar de configurar la base de datos. Para ver el ejemplo completo se puede ver en github.
Las mayores diferencias es que se creará un objeto de tipo destino, JdbcTarget, en nuesto ejemplo.There's no need to detail the test as it is specific to the component we are testing in the example. The important thing is to see how the SourceRunner
object (runner in the example) is used to produce records. It is also useful to note how the values of the obtained data can be checked to ensure they are as expected.
Complete example on github.
Destination Test
To test writing to the destination, a strategy similar to the one used for testing reading was employed. Therefore, we won't repeat the similar code for setting up the database. For the complete example, please refer to github. The main difference is that a destination object, JdbcTarget, will be created in our example:
Code Block |
---|
Target target = new JdbcTarget( schema, tableName, fieldMappings, false, false, false, JdbcMultiRowRecordWriter.UNLIMITED_PARAMETERS, ChangeLogFormat.NONE, JDBCOperationType.INSERT, UnsupportedOperationAction.DISCARD, createConfigBean(h2ConnectionString, username, password), Collections.emptyList()); |
Se crea un TargetRunner para ejecutar la escritura. Además, para para escribir hay que crear los registros que formarían el batch de escritura. Finalmente, se usa el TargetRunner para escribir en el destinoA TargetRunner
is created to execute the write operation. To write data, you need to create the records that will form the write batch. Finally, the TargetRunner
is used to write these records to the destination.
Code Block |
---|
TargetRunner targetRunner = new TargetRunner.Builder(JdbcDTarget.class, target).build(); Record record = RecordCreator.create(); List<Field> fields = new ArrayList<>(); fields.add(Field.create(1)); fields.add(Field.create("Adam")); fields.add(Field.create("Kunicki")); fields.add(Field.createDatetime(new Instant().toDate())); record.set(Field.create(fields)); List<Record> singleRecord = Collections.unmodifiableList(Arrays.asList(record)); targetRunner.runInit(); targetRunner.runWrite(singleRecord); |
Finalmente hay comprobar que se ha escrito en el destino. En este ejemplo comprobando si se ha escrito en la base de datosFinally, you need to verify that the data has been written to the destination. In this example, we check if the data has been written to the database.
Code Block |
---|
connection = DriverManager.getConnection(h2ConnectionString, username, password); try (Statement statement = connection.createStatement()) { ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM " + fullTableName); rs.next(); assertEquals(1, rs.getInt(1)); } |
...
Processor
...
Test
This case is very similar to the Sources and Destinations tests. The main difference is that a processor component and a ProcessorRunner
are used.
Code Block |
---|
JdbcLookupDProcessor processor = createProcessor(); ProcessorRunner processorRunner = new ProcessorRunner.Builder(JdbcLookupDProcessor.class, processor) .addConfiguration("query", listQuery).addConfiguration("columnMappings", columnMappings) .addConfiguration("multipleValuesBehavior", MultipleValuesBehavior.FIRST_ONLY) .addConfiguration("missingValuesBehavior", MissingValuesBehavior.SEND_TO_ERROR) .addConfiguration("maxClobSize", 1000).addConfiguration("maxBlobSize", 1000) .addConfiguration("validateColumnMappings", true).addOutputLane("lane").build(); |
El resto de comprobaciones son similares a las de destino y origen. Se puede ver el ejemplo completo en The remaining checks are similar to those for sources and destinations. You can view the complete example on github.