Creation of a pipeline for public data intake in the platform
In this tutorial, we will create a flow to access a JCDecaux public API, getting via pooling the status of several bicycle stations in different cities. This flow will filter the resulting data to keep only those from the city of Seville. Then, the fields will be filtered to keep only those we consider useful; some of them will be renamed with a more fitting name; and an ontology will be inserted in the platform.
The first step will be creating an ontology to serve as data destination, along with a device to insert them from the platform destination node. This ontology will be named "bikesStreamsets" being its type GENERAL → EmptyBase:
We will add these fields to the ontology:
- city → string
- id → integer
- bike_stands → integer
- available_bikes → integer
We can see an instance of this ontology with the button "Generate Instance"
{"bikesStreamsets":{ "city":"string","id":1,"bike_stands":1,"available_bikes":1}}
Once created, we will create a device, with writing permissions on the ontology. The device will be named devdevicepl:
Having created this, and taking the different data from device, token and ontology, we will create a pipeline that can go through all the ontology insertion process.
From a user with role analytics/datascientist, we go to the menu Analytics Tools → My pipelines and click on the button "Create Pipeline". We will give it the name "restBikesIngest"
We are taken to the pipeline edition screen, where we can visually edit the flow.
Llegaremos a la pantalla de edición del pipeline donde podremos editar visualmente el flujo:
The first node we will include is the data origin. Going through the node palette, by origins, we can see all the possible options. We select HTTP Client so we can make a request to JCDecaux public API.
We will configure the node so it makes a "GET" pooling every 3000 milliseconds (3 seconds) to the API: https://api.jcdecaux.com/vls/v1/stations?apiKey=f1681fc91ab6c7cffa878aeaeb31a787a6a36c48 . JCDecaux webpage provides the apiKey. We will leave the other options with their default values.
If we launch this URL from a postman or from the browser itself (that makes a GET request), we can see the result it gives:
We can see that the structure is a JSON element array, where every JSON element has different fields. An individual example instance would be:
{"number":9087,
"name":"9087-MAZARGUES",
"address":"MAZARGUES - ROND POINT DE MAZARGUES (OBELISQUE)",
"position":
{"lat":43.250903869637334,
"lng":5.403244616491982
},
"banking":true,
"bonus":false,
"status":"OPEN",
"contract_name":"Marseille",
"bike_stands":21,
"available_bike_stands":19,
"available_bikes":2,
"last_update":1532002008000
}
The end result we want is as follows:
{
"bikesStreamsets": → Ontology name
{
"city":"string", → Same field as contract_name
"id":integer, → Same field as number
"bike_stands":integer, → Original field
"available_bikes":integer → Original field
}
}
We also want only those records about "Seville". For the sake of efficiency, this will be the first step in the flow once the data has been recovered. To do it, we will use a Stream Selector-type processor, which will divide the flow in two streams, one with contract_name "Seville" and the other for discarded records using a "Trash" node:
Now we will keep only the fields we need for the destination ontology with the Field Remover node:
In the next step, we will rename the fields with the specifics. To do this we will use a Field Renamer type node.
Finally, we will use a onesait Platform destination node, where we will configure the destination ontology, the host and the device data: Additionally, we will include 500 records in bulk node and the ontology name as rootnode; and we will use an insertion thread, which will be enough for the process:
The final flow will look like this:
By running the whole process, we can see its statistics, such as the intake throughput or the total inserted records, all of it in real time:
Finally, using the "query tool", we can see the instances that the process has inserted in the ontology:







