ksql> SELECT STATIONREFERENCE, EAREGIONNAME \
FROM ENVIRONMENT_DATA;
L2404 | North East
ksql> SELECT STATIONREFERENCE, EAREGIONNAME, MASK_RIGHT(EAREGIONNAME,4) AS REGION_NAME_MASKED
FROM ENVIRONMENT_DATA2;
L2404 | North East | North Xxxx
There are several MASK-based functions, and if you have your own special sauce you’d like to use here, KSQL does support user-defined functions (UDFs) as of Confluent Platform 5.0.
Recap
So far, we’ve ingested data from several sources with similar but varying data models. Using KSQL, we’ve accomplished data wrangling by:
- Flattening nested data structures
- Reserializing JSON data to Avro
- Unifying the multiple streams into one
- Setting the message partitioning key
- Setting the message timestamp metadata to the correct logical value
- Creating derived columns in the transformation
- Filtering and masking the data
The results of these transformations is continually populated Kafka topics. As new messages arrive on the source, continuously running KSQL statements process and write them to the target Kafka topic.
Streaming onwards…
The great thing about Kafka is its ability to build systems in which functionality is compartmentalized. Ingest is handled by one process (in this case, Kafka Connect), and transformation is handled by a series of KSQL statements. Each can be modified and switched out for another without impacting the pipeline we’re building. Keeping them separate makes it easier to perform important activities such as testing, troubleshooting and analyzing performance metrics. It also means that we can extend data pipelines easily.
We may have a single use case in mind when initially building it, and one way to do this would be building a single application that pulls data from REST endpoints before cleansing, wrangling and writing it out to the original target. But now if we want to add other targets, we have to modify that application, which becomes more complex and risky. Instead, by breaking up the processes and building them all around Kafka, adding another target for the data is as simple as consuming the transformed data from a Kafka topic.
So, let’s take our transformed data and do something with it! We can use it to drive analytic requirements, but we’ll also see how it can drive applications themselves, too.
For our analytics, we’re going to land the data to BigQuery, Google’s cloud data warehouse tool. We’ll use another Kafka Connect community connector, one written by WePay to stream data from Kafka topics to BigQuery. You’ll need to set up your Google Cloud Platform (GCP) credentials in a file accessible to the Connect worker(s), and also make sure that the BigQuery project and dataset exist first. Here, I’m using ones called devx-testing and environment_data, respectively:
{
"name": "sink_gbq_environment-data",
"config": {
"connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "ENVIRONMENT_DATA",
"autoCreateTables":"true",
"autoUpdateSchemas":"true",
"project":"devx-testing",
"datasets":".*=environment_data",
"keyfile":"/root/creds/gcp_creds.json"
[...]
Once deployed, we can see data arriving in BigQuery using the GCP console:
$ bq ls environment_data
tableId Type Labels Time Partitioning
------------------------ ------- -------- -------------------
ENVIRONMENT_DATA TABLE DAY
$ bq query 'select * from environment_data.ENVIRONMENT_DATA'
Waiting on bqjob_r5ce1258159e7bf44_000001658f8cfedb_1 ... (0s) Current status: DONE
+------------------+--------------+------------------------+------+------------+----------------------+-----------+-----------+----------------------+---------------+-------+----------+
| STATIONREFERENCE | EAREGIONNAME | EAAREANAME | TOWN | RIVERNAME | LABEL | LAT | LONG | DATETIME | PARAMETERNAME | VALUE | UNITNAME |
+------------------+--------------+------------------------+------+------------+----------------------+-----------+-----------+----------------------+---------------+-------+----------+
| L2404 | North East | North East - Yorkshire | York | River Ouse | Foss Barrier | 53.952443 | -1.078056 | 2018-08-08T16:30:00Z | Water Level | 5.01 | mAOD |
| L2404 | North East | North East - Yorkshire | York | River Ouse | Foss Barrier | 53.952443 | -1.078056 | 2018-08-08T18:15:00Z | Water Level | 5.003 | mAOD |
[...]
There are many ways to work with data in BigQuery: the direct SQL interface, the GUI console—or through numerous analytics visualization tools, including Looker, Tableau, Qlik, Redash, etc. Here, I’ve used Google’s own Data Studio. Connecting to BigQuery is simple, and once the dataset is in Data Studio, it’s a matter of moments to throw some useful visualizations together:
We’ve discussed streaming data to Google BigQuery, but did you know that you can also stream the same transformed data to GCS for archival purposes or even batch access from other applications (although arguably this would be done from consuming the Kafka topic directly)?
{
"name": "sink_gcs_environment-data",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"topics": "ENVIRONMENT_DATA",
"gcs.bucket.name": "rmoff-environment-data",
"gcs.part.size": "5242880",
"flush.size": "16",
"gcs.credentials.path": "/root/creds/gcp_creds.json",
[...]
With this connector running, we now have data streaming to both BigQuery and GCS:
$ gsutil ls gs://rmoff-environment-data/topics/
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/
$ gsutil ls gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/ENVIRONMENT_DATA+0+0000000000.json
gs://rmoff-environment-data/topics/ENVIRONMENT_DATA/partition=0/ENVIRONMENT_DATA+0+0000000016.json