Creación de un pipeline para la ingesta de datos públicos en la plataforma

Creación de un pipeline para la ingesta de datos públicos en la plataforma

En este tutorial, vamos a crear un flujo que accede a un API pública de JCDecaux obteniendo el estado por polling de múltiples estaciones de bicis y en diferentes ciudades. Este flujo filtrará los datos obtenidos para quedarse sólo con los pertenecientes a Sevilla. Posteriormente se filtrarán los campos para quedarnos con los considerados útiles, se renombrarán algunos con un nombre más apropiado y se insertará en una ontología de la plataforma.

El primer paso será la creación de una ontología destino de los datos, así como un device para poder insertarlos desde el nodo destino de la plataforma. Esta ontología la denominaremos "bikesStreamsets" de tipo GENERAL → EmptyBase:

En la misma incluiremos los campos:

  • city → string

  • id → integer

  • bike_stands → integer

  • available_bikes → integer

Podemos observar una instancia de la misma con el botón "Generate Instance"

      {"bikesStreamsets":{ "city":"string","id":1,"bike_stands":1,"available_bikes":1}}

Una vez creada, pasaremos a crear un device con permisos de escritura sobre la misma, al que denominaremos devdevicepl:

Con esto creado y cogiendo los diferentes datos del device, token y ontología, pasaremos a crear un pipeline que pueda realizar todo el proceso de inserción en la ontología.

Desde un usuario de rol analytics/datascientics iremos al menú Analytics Tools → Mis pipelines y daremos al botón "Crear Pipeline" y le daremos un nombre "restBikesIngest"

Llegaremos a la pantalla de edición del pipeline donde podremos editar visualmente el flujo:

El primer nodo que vamos a incluir es el de origen de datos. Buscando en la paleta de nodos por orígenes, podemos ver todos los que hay. Seleccionaremos el de HTTP Client para poder hacer una petición al API pública de JCDecaux.

Este nodo lo configuraremos para que haga un pooling "GET" cada 3000 milisegundos (3 segundos) hacia el API https://api.jcdecaux.com/vls/v1/stations?apiKey=f1681fc91ab6c7cffa878aeaeb31a787a6a36c48 . El apiKey se puede obtener desde la propia página web de JCDecaux. El resto de opciones las dejaremos por defecto.

Si lanzamos desde un postman o el propio navegador (que realiza una petición GET) esa url, podemos ver el resultado que devuelve.

Vemos que la estructura es un array de elementos json, en los que cada elemento json tiene diferentes campos. Individualmente una instancia ejemplo sería:

{
"number":9087,
"name":"9087-MAZARGUES",
"address":"MAZARGUES - ROND POINT DE MAZARGUES (OBELISQUE)",
"position":
{
"lat":43.250903869637334,
"lng":5.403244616491982
},
"banking":true,
"bonus":false,
"status":"OPEN",
"contract_name":"Marseille",
"bike_stands":21,
"available_bike_stands":19,
"available_bikes":2,
"last_update":1532002008000
}

El resultado final que queremos tener es obtener es el siguiente:

{

"bikesStreamsets":    → Nombre de la ontología

     {

     "city":"string",    → Mismo campo que contract_name

     "id":integer,      → Mismo campo que number

     "bike_stands":integer,   → Campo original

     "available_bikes":integer   → Campo original

     }

}

Además vamos a querer sólo los registros pertenecientes a "Seville". Por eficiencia, este será el primer paso en el flujo una vez recuperados los datos. Para ello usaremos un processor de tipo Stream Selector que dividirá el flujos en dos streams, uno con un contract_name de "Seville" y el resto serán desechados mediante un nodo "Trash":

Después, nos quedaremos sólo con los campos necesarios para la ontología destino con el nodo Field Remover:

 En el paso siguiente, renombramos los campos como lo especificado. Para ello usamos un nodo de tipo Field Renamer:

Finalmente usamos un nodo destino de la plataforma onesait platform, en el que configuramos la ontología destino, el host y los datos del device. Adicionalmente, incluiremos 500 registro en modo bulk, incluiremos el nombre de la ontología como rootnode y usaremos un hilo de inserción, que será suficiente para el proceso:

El flujo final queda de la siguiente forma.

Arrancando todo el proceso podemos ver las estadísticas del mismo, como el throughput de la inserción o los registros insertados totales y todo ello en tiempo real:

Finalmente desde la herramienta de "query tool" podemos observar las instancias insertadas en la ontología por el proceso: