KSQL como motor de streaming sobre SQL

Introducción a KSQL

KSQL es un motor de streaming open-source que permite el procesamiento de información en tiempo real mediante el uso de lenguaje SQL sobre Apache Kafka, evitando la necesidad de programar código.

Facilita la lectura, escritura y procesamiento de flujos de datos en tiempo real, de manera fiable, distribuida y escalable. Soporta una gran cantidad de operaciones incluyendo entre ellas filtrado, agregaciones, joins, tratamiento por ventanas y conversión a sesiones.

Antes de nada, para aquellos que no conozcan KSQL, dejadme que explique brevemente los componentes principales que intervienen.

  1.  STREAM: Un stream es una secuencia sin fin de datos estructurados. Estos hechos o eventos son inmutables, lo que significa que nuevos eventos pueden ser insertados pero nunca podrán ser actualizados o borrados. Los streams pueden ser creados a partir de un tópico (CS) o derivados de otros streams o tablas existentes (CSAS). Un ejemplo de un stream podría ser la secuencia de eventos que representan las transacciones financieras:

    CREATE STREAM transactions (emmiter VARCHAR, receiver VARCHAR, amount DOUBLE) WITH (kafka_topic='transactions',value_format='JSON');

     

  2. TABLE: Una tabla es una vista de un STREAM o TABLE que representa una colección de eventos que evolucionan en el tiempo. Podríamos decir que es el equivalente a una tabla en una base de datos tradicional, pero enriquecida con características del streaming como el procesamiento de ventanas temporales. Al igual que los STREAMS, una tabla puede ser creada a partir de un tópico Kafka (CT) o derivados de otros streams o tablas existentes (CTAS). Siguiendo el ejemplo del stream, una tabla podría almacenar el balance total de una persona. Con la secuencia de eventos de transacciones, la tabla se puede ir actualizando para que contenga el balance total de su cuenta:

    CREATE TABLE accountBalance as SELECT receiver as user, sum(amount) as balance from transactions;

     

Uso de KSQL en la plataforma

El uso de KSQL en la plataforma está pensado para poder crear flujos que lean los datos que llegan a una ontología, hacer operativas en tiempo real sobre ellos y almacenarlos en una ontología distinta para poder seguir trabajando con ellos en el sistema. Un flujo agrupará los recursos KSQL que engloban una funcionalidad concreta.

Para crear un flujo KSQL en la plataforma, ve al menú: Development > My SQL Streams

Una vez en la pantalla de gestión de flujos KSQL, haz clic en "crear":

Rellena los campos de nombre y descripción para el flujo y haz clic en "Mew" para crear el flujo:

A continuación edita el Flujo para añadir recursos, haciendo clic en el botón:

En esta pantalla podrás modificar la descripción del flujo y gestionar (crear, modificar o borrar) los distintos recursos KSQL asociados al mismo.

Para ver los distintos tipos de recursos KSQL en la plataforma vamos a usar un caso práctico. Imaginemos que, para cada transacción, a cada usuario se le carga una comisión distinta (las comisiones estarán en un tópico kafka llamado "commissions"). Queremos guardar en una ontología las operaciones de transferencia de cada usuario con los importes + comisiones.

En la plataforma tendremos TRES tipos de recursos KSQL:

  • ORIGENES.

  • DESTINOS.

  • PROCESOS.

Vamos a ir creándolos según el supuesto anterior. 


Orígenes

ORIGENES: Son STREAMS que representan la información que llegan en tiempo real a las ontologías. En nuestro caso, estos STREAMS son creados a partir de un tópico (CS). Para facilitar la integración con las ontologías, el código asociado a la creación del STREAM se creará automáticamente y se mostrará para que pueda verse el formato de los datos. El usuario simplemente tendrá que seleccionar de qué ontología quiere leer los datos y el recurso se creará automáticamente.

Supongamos que tenemos una ontología que recibe los datos de transferencias en el siguiente formato:

Un ejemplo del dato podría ser: 

{"emmiter":"John","receiver":"Peter","amount":28.6}

{"emmiter":"John","receiver":"Peter","amount":28.6}

Para crear un origen que lea los datos de esta ontología, una vez hayamos entrado en la pantalla de actualización del flujo KSQL, haremos click en crear y seleccionaremos el tipo de recurso ORIGIN y la ontología de la que leerá el recurso.

Destinos

DESTINOS: Son STREAMS que representan el paso de datos del flujo KSQL a una ontología. En nuestro caso, estos STREAMS son creados a partir de un tópico. Para facilitar la integración con las ontologías, el código asociado a la creación del STREAM se creará automáticamente como plantilla y podrá ser editable por el usuario. Es importante recalcar que si se cambian el nombre del tópico Kafka o la estructura de campos, es posible que los datos no lleguen a la plataforma, así que se recomienda no modificar la sentencia salvo casos excepcionales (cambio de KEY o alguna propiedad adicional)

Supongamos que queremos almacenar en una ontología el importe de las transferencias más las comisiones aplicadas. El formato será:

Es imprescindible que la ontología haya sido creada con la opción de "creación de tópico Kafka". Además, para nuestro caso, necesitaremos que los campos estén en mayúsculas. Esto es por requerimiento de KSQL. Como vamos a recibir la información de un cruce (JOIN), el resultado viene particionado por una clave (client en nuestro caso). Para KSQL, siempre que se indique una clave, ha de ser un campo en mayúscula.

Para crear un destino, en la pantalla de creación de recursos KSQL, selecciona el tipo DESTINY y la ontología donde quieres almacenar el resultado de tu flujo. Añade al flujo la "KEY" ya que vas a recibir la información de un cruce por "client". Además, elimina de la plantilla los acentos invertidos del nombre de los campos. En el caso de que el destino no recibiera los datos de un cruce, no sería necesario alterar la sentencia:

Procesos

PROCESOS: Son todos aquellos STREAMS o TABLES que no representan ni lectura o escritura desde/hacia una ontología. En este caso, el usuario creará las consultas KSQL de manera libre. Para nuestro ejemplo, tendremos que crear varios recursos:

  • Tabla para acceder a las comisiones de cada usuario:

Supongamos que tenemos las comisiones por usuario en el tópico "commissions" en formato JSON de la siguiente manera:

{"client":"Peter","commission":1.23} 
{"client":"Robert","commision":0.94}

{"client":"Peter","commission":1.23} 
{"client":"Robert","commision":0.94}

Crearemos una serie de recursos de tipo PROCESS con el siguiente objectivo:

  • Stream que lee los datos de nuestro origen y los particiona (obligatorio para hacer JOINS). Crearemos un STREAM como tipo PROCESS de la siguiente manera:

  • Creación de la tabla de comisiones a partir del tópico:

  • Inserción en el destino mediante el cruce de los datos del stream anterior y la tabla de comisiones. Por último haremos un LEFT JOIN entre los datos del STREAM "transactionKey" y la tabla "commissions", sumando los importes e insertándolos al destino creado:

Una vez creados todos nuestros recursos KSQL en el flujo, cada vez que se inserte un dato en la ontología de transacciones, se calculará de manera automática la aplicación de comisiones, almacenándose el resultado en la ontología transactionsWithCommissions.