Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Introducción

Para aumentar las capacidades del Dataflow es posible desarrollar etapas personalizadas que se pueden utilizar en los pipelines que se creen con el propio Dataflow del mismo modo que cualquiera de los componentes propios del Dataflow.

Este artículo describe cómo se pueden hacer test automáticos con JUnit para probar las esatapas personalizadas que se desarrollen.

Para mostrar ejemplos, usaremos el proyecto <github>. Dicho proyecto incluye ejemplos de pruebas para los componentes JDBC del propio Dataflow. De esta forma muestra ejemplos tanto para etapas de tipo origen, de tipo destino y de tipo procesadores.

Configuración del proyecto

El proyecto de ejemplo <github> usa Maven para la gestión de las dependencias. A continuación se describen las depedencias que se han utilizado.

Versiones utilizadas.

<properties>
	<maven.compiler.target>1.8</maven.compiler.target>
	<maven.compiler.source>1.8</maven.compiler.source>
	<sdc.version>3.23.0</sdc.version>
	<junit.version>5.9.3</junit.version>
	<h2.version>2.2.220</h2.version>
</properties>

Librería JUnit, en este caso estamos utilizando JUnit 5.

<dependency>
    <groupId>org.junit.jupiter</groupId>
	<artifactId>junit-jupiter-api</artifactId>
	<version>${junit.version}</version>
	<scope>test</scope>
</dependency>

En este caso, dado que estamos probando la librería JDBC del dataflow, necesitamos declarar la libería que tiene los componentes que vamos a probar. En un proyecto de desarrollo de componentes esto no sería necesario ya que los test estarían en el propio proyecto.

<dependency>
    <groupId>com.streamsets</groupId>
	<artifactId>streamsets-datacollector-stagesupport</artifactId>
	<version>${sdc.version}</version>
	<scope>test</scope>
</dependency>

En muchos casos necesitaremos librerías adicionales si queremos probar las conexiones externas. Esto dependerá del tipo de test que se quiera realizar, unitarios con mocks, de integración con los sistemas externos. En este caso lo vamos a hacer utilizando una base de datos para probar las conexiones directamente. Para ello utilizamos la base de datos en memoria H2.

<dependency>
    <groupId>com.h2database</groupId>
	<artifactId>h2</artifactId>
	<version>${h2.version}</version>
	<scope>test</scope>
</dependency>

Finalmente, es necesario declarar las dependencias propias del dataflow que se utilizarán para ejecutar las estapas a probar y poder comprobar los resultados.

<dependency>
	<groupId>com.streamsets</groupId>
	<artifactId>streamsets-datacollector-api</artifactId>
	<version>${sdc.version}</version>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>com.streamsets</groupId>
	<artifactId>streamsets-datacollector-sdk</artifactId>
	<version>${sdc.version}</version>
	<scope>test</scope>
</dependency>

Prueba de Orígen

En este ejemplo para probar el origen JDBC arrancamos una base de datos H2 y la poblamos con unas tablas para probar.

@BeforeAll
public static void setUp() throws SQLException {
	final String sdcId = "testingID";
	Utils.setSdcIdCallable(new Callable<String>() {
		@Override
		public String call() {
			return sdcId;
		}
	});

	// Create a table in H2 and put some data in it for querying.
	connection = DriverManager.getConnection(h2ConnectionString, username, password);
	try (Statement statement = connection.createStatement()) {
		// Setup table
		statement.addBatch("CREATE SCHEMA IF NOT EXISTS TEST;");
		statement.addBatch(
				"CREATE TABLE IF NOT EXISTS "+fullTableName+" " +
						"(p_id INT NOT NULL, first_name VARCHAR(255), last_name VARCHAR(255));"
				);

		// Add some data
		statement.addBatch("INSERT INTO "+fullTableName+" VALUES (1, 'Adam', 'Kunicki')");
		statement.addBatch("INSERT INTO "+fullTableName+" VALUES (2, 'Jon', 'Natkins')");
		statement.addBatch("INSERT INTO "+fullTableName+" VALUES (3, 'Jon', 'Daulton')");
		statement.addBatch("INSERT INTO "+fullTableName+" VALUES (4, 'Girish', 'Pancha')");

		statement.executeBatch();
	}
}

Del mismo modo, al finalizar el test limpiamos la base de datos.

@AfterAll
public static void tearDown() throws SQLException {
	try (Statement statement = connection.createStatement()) {
		// Setup table
		statement.execute("DROP TABLE IF EXISTS "+fullTableName+";");
		statement.execute("DROP SCHEMA IF EXISTS "+schema+";");
	}

	// Last open connection terminates H2
	connection.close();
}

Para realizar la prueba lo primero que hacemos es crear un componente de tipo origen JDBC

JdbcSource origin = new JdbcSource(
				true,
				query,
				initialOffset,
				"P_ID",
				false,
				"",
				1000,
				JdbcRecordType.LIST_MAP,
				new CommonSourceConfigBean(queriesPerSecond, BATCH_SIZE, CLOB_SIZE, CLOB_SIZE),
				false,
				"",
				createConfigBean(h2ConnectionString, username, password),
				UnknownTypeAction.STOP_PIPELINE,
				queryInterval
				);

Los valores de inicialización dependen del tipo de componente, por lo que los utilizados en este ejemplo son los específicos del origen de JDBC.

Lo siguiente es crear un objeto SourceRunner que nos va a permitir ejecutar las “lecturas” del origen.

SourceRunner runner = new SourceRunner.Builder(JdbcDSource.class, origin)
		.addOutputLane("lane")
		.build();

runner.runInit();

Una vez hecho esto simulamos una secuencia de lecturas, y si queremos escrituras, de la base de datos con el origen. Además nos aseguramos de liberar los recursos del runner cuanto terminamos de ejecutar el test.

try {
	// Check that existing rows are loaded.
	StageRunner.Output output = runner.runProduce(null, 2);
	Map<String, List<Record>> recordMap = output.getRecords();
	List<Record> parsedRecords = recordMap.get("lane");

	assertEquals(2, parsedRecords.size());
	assertEquals("2", output.getNewOffset());

	// Check that the remaining rows in the initial cursor are read.
	output = runner.runProduce(output.getNewOffset(), 100);
	parsedRecords = output.getRecords().get("lane");
	assertEquals(2, parsedRecords.size());

	// Check that new rows are loaded.
	runInsertNewRows();
	output = runner.runProduce(output.getNewOffset(), 100);
	parsedRecords = output.getRecords().get("lane");

	assertEquals(2, parsedRecords.size());
	assertEquals("10", output.getNewOffset());

	// Check that older rows are not loaded.
	runInsertOldRows();
	output = runner.runProduce(output.getNewOffset(), 100);
	parsedRecords = output.getRecords().get("lane");
	assertEquals(0, parsedRecords.size());
} finally {
	runner.runDestroy();
}

No merece la pena detallar el test porque es específico del componente que estamos probando en el ejemplo. Lo importante es fijarse en cómo se usa el objeto de la clase SourceRunner (runner en el ejemplo) para ir produciendo registros. También es útil fijarse en cómo se pueden comprobar los valores de los datos obtenidos para poder comprobar que los valores obtenidos son los esperados.

El ejemplo completo se puede ver en <github>

Prueba de Destino

Para probar la escritura en el destino se ha usado una estrategia similar a la empleada para probar la lectura. Por lo que no vamos a repetir la parte de código similar de configurar la base de datos. Para ver el ejemplo completo se puede ver en <github>.

Las mayores diferencias es que se creará un objeto de tipo destino, JdbcTarget, en nuesto ejemplo.

Target target = new JdbcTarget(
		schema, 
		tableName, 
		fieldMappings, 
		false, 
		false, 
		false,
		JdbcMultiRowRecordWriter.UNLIMITED_PARAMETERS, 
		ChangeLogFormat.NONE, 
		JDBCOperationType.INSERT,
		UnsupportedOperationAction.DISCARD, 
		createConfigBean(h2ConnectionString, username, password),
		Collections.emptyList());

Se crea un TargetRunner para ejecutar la escritura. Además, para para escribir hay que crear los registros que formarían el batch de escritura. Finalmente, se usa el TargetRunner para escribir en el destino.

TargetRunner targetRunner = new TargetRunner.Builder(JdbcDTarget.class, target).build();

Record record = RecordCreator.create();
List<Field> fields = new ArrayList<>();
fields.add(Field.create(1));
fields.add(Field.create("Adam"));
fields.add(Field.create("Kunicki"));
fields.add(Field.createDatetime(new Instant().toDate()));
record.set(Field.create(fields));

List<Record> singleRecord = Collections.unmodifiableList(Arrays.asList(record));
targetRunner.runInit();
targetRunner.runWrite(singleRecord);

Finalmente hay comprobar que se ha escrito en el destino. En este ejemplo comprobando si se ha escrito en la base de datos.

connection = DriverManager.getConnection(h2ConnectionString, username, password);
try (Statement statement = connection.createStatement()) {
	ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM " + fullTableName);
	rs.next();
	assertEquals(1, rs.getInt(1));
}

Prueba de un Processor

Este caso es muy similar al de los Orígenes y los Destinos. La mayor diferencia radica en que se utiliza un componente de tipo processor y un runner de tipo ProcessorRunner.

JdbcLookupDProcessor processor = createProcessor();

ProcessorRunner processorRunner = new ProcessorRunner.Builder(JdbcLookupDProcessor.class, processor)
		.addConfiguration("query", listQuery).addConfiguration("columnMappings", columnMappings)
		.addConfiguration("multipleValuesBehavior", MultipleValuesBehavior.FIRST_ONLY)
		.addConfiguration("missingValuesBehavior", MissingValuesBehavior.SEND_TO_ERROR)
		.addConfiguration("maxClobSize", 1000).addConfiguration("maxBlobSize", 1000)
		.addConfiguration("validateColumnMappings", true).addOutputLane("lane").build();

El resto de comprobaciones son similares a las de destino y origen. Se puede ver el ejemplo completo en <github>.

  • No labels