/iot-vehicle-tracking-demo

The Truck Demo used in various talk on Kafka, KSQL and Introduction to Stream Processing

Primary LanguageShellApache License 2.0Apache-2.0

IoT Vehicle Tracking End-to-End Streaming Demo

In this demo we will be ingesting IoT data into a Kafka topic from which it will be analysed using Stream Analytics. To make it a bit more realistic, the data is not directly sent to Kafka from the IoT devices (vehicles) but first sent through an MQTT broker (IoT gateway).

The following diagram shows the setup of the data flow which will be implemented step by step. Of course we will not be using real-life data, but have a program simulating trucks and their driving behaviour.

Alt Image Text

We will see various technologies in action, such as Kafka, MQTT, Kafka Connect, Kafka Streams and ksqlDB.

Preparation

The platform where the demos can be run on, has been generated using the platys toolset using the platys-modern-data-platform stack.

The generated artefacts are available in the ./docker folder.

The prerequisites for running the platform are Docker and Docker Compose.

The environment is completely based on docker containers. In order to easily start the multiple containers, we are going to use Docker Compose. You need to have at least 12 GB of RAM available, but better is 16 GB.

Start the platform using Docker Compose

First, create the following two environment variables, which export the Public IP address (if a cloud environment) and the Docker Engine (Docker Host) IP address:

export DOCKER_HOST_IP=<docker-host-ip>
export PUBLIC_IP=<public-host-ip>

You can either add them to /etc/environment (without export) to make them persistent or use an .env file inside the docker folder with the two variables.

It is very important that these two are set, otherwise the platform will not run properly.

Now navigate into the docker folder and start docker-compose.

cd ./docker

docker-compose up -d

To show all logs of all containers use

docker-compose logs -f

To show only the logs for some of the containers, for example kafka-connect-1 and kafka-connect-2, use

docker-compose logs -f kafka-connect-1 kafka-connect-2

If you want to stop the platform (all containers), run

docker-compose down

Some services in the docker-compose.yml are optional and can be removed, if you don't have enough resources to start them.

As a final step, add dataplatform as an alias to the /etc/hosts file so that the links used in this document work.

<public-host-ip>		dataplatform

If you have no rights for doing that, then you have to use your IP address instead of dataplatform in all the URLs.

Available Services

For a list of available services, navigate to http://dataplatform:80/services.

When a terminal is needed, you can use the Web Terminal available at http://dataplatform:3001/.

Creating the necessary Kafka Topics

The Kafka cluster is configured with auto.topic.create.enable set to false. Therefore we first have to create all the necessary topics, using the kafka-topics command line utility of Apache Kafka.

From a terminal window, use the kafka-topics CLI inside the kafka-1 docker container to create the topics vehicle_tracking_ and logisticsdb_driver .

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic vehicle_tracking_sysA --partitions 8 --replication-factor 3

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic vehicle_tracking_sysB --partitions 8 --replication-factor 3

If you don't like to work with the CLI, you can also create the Kafka topics using the graphical user interfaces Cluster Manager for Kafka (CMAK) or the Apache Kafka HQ (AKHQ).

Check logistics_db Postgresql Database

The necessary tables are created automatically when running the stack using Docker Compose. Use the following command in a terminal window, to show the content of the driver table:

docker exec -ti postgresql psql -d demodb -U demo -c "SELECT * FROM logistics_db.driver"

Setup Shipment MySQL Database

Create the MySQL table with shipment information:

docker exec -it mysql bash -c 'mysql -u root -pmanager'
CREATE USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';
CREATE USER 'replicator'@'%' IDENTIFIED BY 'replpass';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON *.* TO 'debezium';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';

GRANT SELECT, INSERT, UPDATE, DELETE ON sample.* TO sample;

USE sample;

DROP TABLE shipment;

CREATE TABLE shipment (
                id INT PRIMARY KEY,
                vehicle_id INT,
                target_wkt VARCHAR(2000),
                create_ts timestamp DEFAULT CURRENT_TIMESTAMP,
                update_ts timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
                
INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (1,11, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');     

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (2, 42, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))');         

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (3, 12, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 
                
INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (4, 13, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (5, 14, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (6, 15, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (7, 32, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (8, 48, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

Simulating Vehicle Tracking Data

For simulating vehicle tracking data, we are going to use a Java program (adapted from Hortonworks) and maintained here.

The simulator can produce data to various targets, such as Kafka, MQTT or Files. These two options are shown below.

Now let's produce the truck events to the MQTT broker running on port 1883. In a terminal window run the following command:

docker run --network host --rm trivadis/iot-truck-simulator '-s' 'MQTT' '-h' $DOCKER_HOST_IP '-p' '1883' '-f' 'JSON' '-vf' '1-49'

Leave this running in the terminal window.


Note: You can shortcut this workshop here by skipping step 1 and publishing directly to Kafka instead of MQTT. In that case perform use the following command instead of the one below docker run trivadis/iot-truck-simulator '-s' 'KAFKA' '-h' $DOCKER_HOST_IP '-p' '9092' '-f' 'JSON' '-vf' '1-49' and skip to Step 2.


Step 1 - Consume Vehicle Tracking messages from MQTT and send to Kafka

In this part we will show how we can consume the data from the MQTT broker and send it to a Kafka topic. We will be using Kafka Connect for that.

Alt Image Text

Check message in MQTT

Let's first see that we get vehicle tracking messages in the topic MQTT topic hierarchy trucks/+/position.

There two options for consuming from MQTT

  • use dockerized MQTT client in the terminal
  • use browser-based HiveMQ Web UI

Using Dockerized MQTT Client

To start consuming using through a command line, perform the following docker command:

docker run -it --rm --name mqttclient efrecon/mqtt-client sub -h "$DOCKER_HOST_IP" -p 1883 -t "truck/+/position" -v

The consumed messages will show up in the terminal.

Using HiveMQ Web UI

To start consuming using the MQTT UI (HiveMQ Web UI), navigate to http://dataplatform:28136 and connect using dataplatform for the Host field, 9101 for the Port field and then click on Connect:

Alt Image Text

When successfully connected, click on Add New Topic Subscription and enter truck/+/position into Topic field and click Subscribe:

Alt Image Text

As soon as messages are produced to MQTT, you should see them either on the CLI or in the MQTT UI (Hive MQ) as shown below.

Alt Image Text

Alternatively you can also use the MQTT.fx, MQTT Explorer or MQTTLens applications to browse for the messages on the MQTT broker. They are all available for installation on Mac or Windows.

Setup Kafka Connect to bridge between MQTT and Kafka

In order to get the messages from MQTT into Kafka, we will be using the Kafka Connect framework. Kafka Connect is part of the Apache Kafka project and can run various connectors, either as Source or as a Sink Connector, as shown in the following diagram:

Alt Image Text

Connectors are available from Confluent as well as other, 3rd party organisations. A good source for connectors is the Confluent Hub, although it is not complete, some connectors can also be found on GitHub projects.

There are multiple Kafka Connectors available for MQTT. We can either use the one provided by Confluent Inc. (in preview and available as evaluation license on Confluent Hub) or the one provided as part of the Landoop Stream-Reactor Project available on GitHub. Here we will use the one provided by Landoop.

You can check the connector which are pre-installed by calling the REST API of the Kafka Connector cluster:

curl -XGET http://dataplatform:8083/connector-plugins | jq

we can see that there is no MQTT connector available yet

Adding the MQTT Kafka Connector

There are two instances of the Kafka Connect service instance running as part of the Modern Data Platform, kafka-connect-1 and kafka-connect-2.

To add the connector implementations, without having to copy them into the docker container (or even create a dedicated docker image holding the jar), both connect services are configured to use additional connector implementations from the local folder /etc/kafka-connect/custom-plugins inside the docker container. This folder is mapped as a volume to the plugins/kafka-connect folder outside of the container on to the docker host.

So into this folder we have to copy the artefacts of the Kafka connectors we want to use.

Navigate into the plugins/kafka-connect folder (which is a sub-folder of the docker folder with the docker-compose.yml file.

cd $DATAPLATFORM_HOME/plugins/kafka-connect/connectors

and download the 4.2.0/kafka-connect-mqtt-4.2.0.zip file from the Landoop Stream-Reactor Project project.

sudo wget https://github.com/lensesio/stream-reactor/releases/download/4.2.0/kafka-connect-mqtt-4.2.0.zip

Once it is successfully downloaded, uncompress it using this tar command and remove the file.

sudo unzip kafka-connect-mqtt-4.2.0.zip
sudo rm kafka-connect-mqtt-4.2.0.zip

Now let's restart Kafka connect in order to pick up the new connector (Make sure to navigate back to the docker folder first, either using cd $DATAPLATFORM_HOME or cd ../..)

cd $DATAPLATFORM_HOME
docker-compose restart kafka-connect-1 kafka-connect-2

The connector should now be added to the Kafka cluster. You can confirm that by watching the log file of the two containers

docker-compose logs -f kafka-connect-1 kafka-connect-2

After a while you should see an output similar to the one below with a message that the MQTT connector was added and later that the connector finished starting ...

...
kafka-connect-2             | [2019-06-08 18:01:02,590] INFO Registered loader: PluginClassLoader{pluginLocation=file:/etc/kafka-connect/custom-plugins/kafka-connect-mqtt-1.2.1-2.1.0-all/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2             | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2             | [2019-06-08 18:01:02,591] INFO Added plugin 'com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2             | [2019-06-08 18:01:02,592] INFO Added plugin 'com.datamountaineer.streamreactor.connect.converters.source.JsonResilientConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect-2             | [2019-06-08 18:01:02,592] INFO Added plugin 'com.landoop.connect.sql.Transformation' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
...
kafka-connect-2             | [2019-06-08 18:01:11,520] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
kafka-connect-2             | [2019-06-08 18:01:11,520] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Configure the MQTT Connector

Before we start the connector, let's use a Kafka console listener to consume from the target topic vehicle_tracking_sysA. We can use the kcat utility (formerly kcatkcat), which you can either install locally or use the one provided with the Compose stack:

docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_sysA -f "%k - %s\n" -q

Now let's start the connector:

curl -X "POST" "$DOCKER_HOST_IP:8083/connectors" \
     -H "Content-Type: application/json" \
     --data '{
  "name": "mqtt-vehicle-position-source",
  "config": {
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max": "1",
    "mqtt.server.uri": "tcp://mosquitto-1:1883",
    "mqtt.topics": "truck/+/position",
    "mqtt.clean.session.enabled":"true",
    "mqtt.connect.timeout.seconds":"30",
    "mqtt.keepalive.interval.seconds":"60",
    "mqtt.qos":"0",
    "kafka.topic":"vehicle_tracking_sysA",
    "confluent.topic.bootstrap.servers": "kafka-1:19092,kafka-2:19093",
    "confluent.topic.replication.factor": "3",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
    }
  }'

The truck position messages are sent to the vehicle_tracking_sysA topic and should show up on the kcat consumer immediately.

If you want to stop the connector, you can again use the REST API:

curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/mqtt-vehicle-position-source"

Monitor connector in Kafka Connect UI

Navigate to the Kafka Connect UI to view the connector in a graphical window.

Step 2 - Using KSQL to Refine the data

In this part we will refine the data and place it in a new topic. The idea here is to have one normalised topic in Avro format, where all the tracking data from both system A and B will be placed, so that further processing can take it from there. For the refinement we will be using ksqlDB.

Alt Image Text

What is ksqlDB?

ksqlDB is an event streaming database purpose-built to help developers create stream processing applications on top of Apache Kafka.

Alt Image Text

Source: Confluent

Connect to ksqlDB engine

Let's connect to the ksqlDB shell

docker exec -it ksqldb-cli ksql http://ksqldb-server-1:8088

Use ksqlDB for displaying messages

Show the available Kafka topics

show topics;

Let's consume the data from the truck_position topic, assuming the truck simulator and the MQTT connector is still running.

print 'vehicle_tracking_sysA';

You can also add the keyword from beginning to start consuming at the beginning of the topic.

print 'vehicle_tracking_sysA' from beginning;

You can also use the show commands for showing the other KSQL objects (which we will now create)

show streams;
show tables;
show queries;

Create a Stream and SELECT from it

First drop the stream if it already exists:

DROP STREAM IF EXISTS vehicle_tracking_sysA_s;

Now let's create the ksqlDB Stream

CREATE STREAM IF NOT EXISTS vehicle_tracking_sysA_s 
  (mqttTopic VARCHAR KEY,
  timestamp VARCHAR, 
   truckId VARCHAR, 
   driverId BIGINT, 
   routeId BIGINT,
   eventType VARCHAR,
   latitude DOUBLE,
   longitude DOUBLE,
   correlationId VARCHAR)
  WITH (kafka_topic='vehicle_tracking_sysA',
        value_format='JSON');

We are using the JSON value format, as our stream is a JSON-formatted string.

Let's see the live data by using a SELECT on the Stream with the EMIT CHANGES clause:

SELECT * FROM vehicle_tracking_sysA_s EMIT CHANGES;

This is a so-called Push Query (declared by the EMIT CHANGES clause). A push query is a form of query issued by a client that subscribes to a result as it changes in real-time.

Alt Image Text

Source: Confluent

You should see a continuous stream of events as a result of the SELECT statement, similar as shown below:

ksql> SELECT * from vehicle_tracking_sysA_s EMIT CHANGES;
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|MQTTTOPIC                     |TIMESTAMP                     |TRUCKID                       |DRIVERID                      |ROUTEID                       |EVENTTYPE                     |LATITUDE                      |LONGITUDE                     |CORRELATIONID                 |
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|truck/11/position             |1599398981285                 |11                            |17                            |1594289134                    |Normal                        |38.99                         |-93.45                        |-8240058917944842967          |
|truck/42/position             |1599398981846                 |42                            |22                            |1325562373                    |Normal                        |37.15                         |-97.32                        |-8240058917944842967          |
|truck/10/position             |1599398982135                 |10                            |10                            |1962261785                    |Normal                        |38.09                         |-91.44                        |-8240058917944842967          |
|truck/34/position             |1599398982454                 |34                            |16                            |1198242881                    |Normal                        |39.01                         |-93.85                        |-8240058917944842967          |

We have submitted our first simple KSQL statement. Let's now add some analytics to this base statement.

Get info on the stream using the DESCRIBE command

DESCRIBE vehicle_tracking_sysA_s;

or with the additional EXTENDED option

DESCRIBE EXTENDED vehicle_tracking_sysA_s;

Create a new "refined" stream where the data is transformed into Avro

First drop the stream if it already exists:

DROP STREAM IF EXISTS vehicle_tracking_refined_s;

And now create the refined ksqlDB Stream with a CREATE STREAM ... AS SELECT ... statement. We include an additional column source, which holds the system the data is coming from.

CREATE STREAM IF NOT EXISTS vehicle_tracking_refined_s 
  WITH (kafka_topic='vehicle_tracking_refined',
        value_format='AVRO',
        VALUE_AVRO_SCHEMA_FULL_NAME='com.trivadis.avro.VehicleTrackingRefined')
AS SELECT truckId AS ROWKEY
		, 'Tracking_SysA' AS source
		, timestamp
		, AS_VALUE(truckId) AS vehicleId
		, driverId
		, routeId
		, eventType
		, latitude
		, longitude
		, correlationId
FROM vehicle_tracking_sysA_s
PARTITION BY truckId
EMIT CHANGES;

To check that the refined topic does in fact hold Avro formatted data, let's just do a normal kcat on the truck_position_refined topic

docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_refined

we can see that it is serialised as Avro

                            Normal�����Q�B@�ףp=
WX��$343671958179690963
���1598125263176��88�6�������
                             Normal���Q���C@��p=
דW��$343671958179690963
% Reached end of topic truck_position_refined [0] at offset 367
���1598125263336��71����ߩ��2Unsafe following distance��Q����B@����(\?W��$343671958179690963
% Reached end of topic truck_position_refined [5] at offset 353
% Reached end of topic truck_position_refined [2] at offset 324
���1598125263526��101���������
                              Normalďż˝=
ףp�E@�R�����V��$343671958179690963
% Reached end of topic truck_position_refined [7] at offset 355

we can use the -s and -r option to specify the Avro Serde and the URL of the schema registry and the output is readable:

docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_refined -s value=avro -r http://schema-registry-1:8081

You can use the Schema Registry UI on http://dataplatform:28102 to view the Avro Schema created by ksqlDB.

Step 3 - Integrate System B

In this part we will show how we can integrate the data from the 2nd vehicle tracking system (System B), where the only integration point available is a set of log files. We can tail these log files and by that get the information as soon as it arrives. We convert the file source into a streaming data source by that. We will be using StreamSets Data Collector for the tail operation, as in real life this data collector would have to run on the Vehicle Tracking System itself or at least on a machine next to it. At the end it needs to be able to access the actual, active file while it is being written to by the application. StreamSets even has an Edge option which is a down-sized version of the full version and is capable of running on a Rasperry Pi.

Alt Image Text

Let's again start a simulator, but this time simulating the file where the tracking data is appended to:

docker run -v "${PWD}/data-transfer/logs:/out" --rm trivadis/iot-truck-simulator "-s" "FILE" "-f" "CSV" "-d" "1000" "-vf" "50-100" "-es" "2"

Create a StreamSets data flow to tail File into Kafka topic vehicle_tracking_sysB.

Alt Image Text

You can import that data flow from ./streamsets/File_to_Kafka.json if you don't want to create it from scratch.

Now let's listen on the new topic

docker exec -ti kcat kcat -b kafka-1 -t vehicle_tracking_sysB -f "%k - %s\n" -q

and then start the flow in StreamSets. You should see the data from the file arriving as a stream of vehicle tracking data.

Field[STRING:97] - {"text":"SystemB,1599556302227,97,21,1325712174,Normal,37.7:-92.61,4331001611104251967"}
Field[STRING:97] - {"text":"SystemB,1599556302994,97,21,1325712174,Normal,37.6:-92.74,4331001611104251967"}
Field[STRING:97] - {"text":"SystemB,1599556303791,97,21,1325712174,Normal,37.51:-92.89,4331001611104251967"}

The first part (before the dash) shows the content of the Kafka key, generated in the Expression Evaluator component in StreamSets. The second part represents the Kafka value. Compared to the data from System A, this system delivers its data in CSV format. Additionally the system name is produced and there is only one value for latitude/longitude, it is sent as string and the two values are delimited by a colon character (:).

Step 4 - Refinement of data from System B into same topic as above

In this part we will do the refinement on the raw data from System B and place it into the same topic vehicle_tracking_refined as used in step 2.

Alt Image Text

Firs lets create the Stream on the raw data topic:

DROP STREAM IF EXISTS vehicle_tracking_sysB_s;
CREATE STREAM IF NOT EXISTS vehicle_tracking_sysB_s 
  (ROWKEY VARCHAR KEY,
   system VARCHAR,
   timestamp VARCHAR, 
   vehicleId VARCHAR, 
   driverId BIGINT, 
   routeId BIGINT,
   eventType VARCHAR,
   latLong VARCHAR,
   correlationId VARCHAR)
  WITH (kafka_topic='vehicle_tracking_sysB',
        value_format='DELIMITED');

System B delivers the latitude and longitude in one field as a string, with the two values delimited by a colon character.

DESCRIBE vehicle_tracking_sysB_s;
DESCRIBE vehicle_tracking_refined_s;

Now we can use the INSERT statement to write the data into the vehicle_tracking_refined_s stream we have created in step 2. We have to make sure that the structure matches (the refinement we perform), which in this case is providing the right value for the soruce column as well as splitting the latLong value into a latitude and longitude value:

INSERT INTO vehicle_tracking_refined_s 
SELECT ROWKEY
    , 'Tracking_SysB' AS source
	, timestamp
	, vehicleId
	, driverId
	, routeId
	, eventType
	, CAST(split(latLong,':')[1] as DOUBLE) as latitude
	, CAST(split(latLong,':')[2] AS DOUBLE) as longitude
	, correlationId
FROM vehicle_tracking_sysB_s
EMIT CHANGES;

Demo 5 - Pull Query on Vehicle Tracking Info ("Device Shadow")

So with the vehicle position data from both source systems normalized into the vehicle_tracking_refined topic and available in ksqlDB throught the vehicle_tracking_refined_s stream object, is it possible to query for the latest position for a given vehicle.

Alt Image Text

In ksqlDB suche queries are called pull queries, in contrast to the streaming queries we have seen so far, known as push queries (using the EMIT CHANGES clause). A pull query is a form of query issued by a client that retrieves a result as of "now", like a query against a traditional RDBS.

Alt Image Text

Source: Confluent

So let's do a SELECT on the stream, restricting on the vehicleId without an EMIT CHANGES

SELECT * FROM vehicle_tracking_refined_s WHERE vehicleId = '42';

This query will return all the messages collected so far for vehicle 42.

ksql> SELECT * FROM vehicle_tracking_refined_s WHERE vehicleId = '42'
>;
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|ROWKEY   |SOURCE   |TIMESTAMP|VEHICLEID|DRIVERID |ROUTEID  |EVENTTYPE|LATITUDE |LONGITUDE|CORRELATI|
|         |         |         |         |         |         |         |         |         |ONID     |
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|42       |Tracking_|166508600|42       |11       |196226178|Normal   |38.65    |-90.2    |-60501605|
|         |SysA     |2589     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508600|42       |11       |196226178|Normal   |39.1     |-89.74   |-60501605|
|         |SysA     |5857     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508600|42       |11       |196226178|Normal   |39.84    |-89.63   |-60501605|
|         |SysA     |9608     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508601|42       |11       |196226178|Normal   |40.38    |-89.17   |-60501605|
|         |SysA     |3558     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508601|42       |11       |196226178|Normal   |40.76    |-88.77   |-60501605|
|         |SysA     |6637     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508601|42       |11       |196226178|Normal   |41.11    |-88.42   |-60501605|
|         |SysA     |9778     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508602|42       |11       |196226178|Normal   |41.48    |-88.07   |-60501605|
|         |SysA     |2997     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508602|42       |11       |196226178|Normal   |41.87    |-87.67   |-60501605|
|         |SysA     |6737     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
|42       |Tracking_|166508603|42       |11       |196226178|Normal   |41.87    |-87.67   |-60501605|
|         |SysA     |0128     |         |         |5        |         |         |         |965346141|
|         |         |         |         |         |         |         |         |         |45       |
Query Completed
Query terminated

Note that the query terminates because of it being a pull query (similar to database query, ending at the end of the set).

Could we also use a query to just return the latest data point per vehicle?

For that we can use a GROUP BY on vehicleId and using latest_by_offset on each field.

DROP TABLE IF EXISTS vehicle_tracking_refined_t DELETE TOPIC;

CREATE TABLE IF NOT EXISTS vehicle_tracking_refined_t
WITH (kafka_topic = 'vehicle_tracking_refined_t')
AS
SELECT vehicleId
       , latest_by_offset(driverId)	   driverId
		, latest_by_offset(source)			source
		, latest_by_offset(eventType)		eventType
		, latest_by_offset(latitude)		latitude
		, latest_by_offset(longitude)		longitude
FROM vehicle_tracking_refined_s
GROUP BY vehicleId
EMIT CHANGES;

This table uses the vehicleId as the primary key (due to the GROUP BY) and materialises all values as the latest one from the aggregation.

DESCRIBE vehicle_tracking_refined_t;

A describe on the table shows that this primary key is of type STRING:

ksql> DESCRIBE vehicle_tracking_refined_t;

Name                 : VEHICLE_TRACKING_REFINED_T
 Field     | Type
--------------------------------------------
 VEHICLEID | VARCHAR(STRING)  (primary key)
 DRIVERID  | BIGINT
 SOURCE    | VARCHAR(STRING)
 EVENTTYPE | VARCHAR(STRING)
 LATITUDE  | DOUBLE
 LONGITUDE | DOUBLE
--------------------------------------------

So to test the pull query, we have to switch to a string, otherwise an error is shown:

SELECT * FROM vehicle_tracking_refined_t WHERE vehicleId = '42';

But we could also change the CREATE TABLE statement to CAST the vehicleId into a BIGINT:

DROP TABLE IF EXISTS vehicle_tracking_refined_t DELETE TOPIC;

CREATE TABLE IF NOT EXISTS vehicle_tracking_refined_t
WITH (kafka_topic = 'vehicle_tracking_refined_t')
AS
SELECT CAST(vehicleId AS BIGINT)			vehicleId
       , latest_by_offset(driverId)	   driverId
		, latest_by_offset(source)			source
		, latest_by_offset(eventType)		eventType
		, latest_by_offset(latitude)		latitude
		, latest_by_offset(longitude)		longitude
FROM vehicle_tracking_refined_s
GROUP BY CAST(vehicleId AS BIGINT)
EMIT CHANGES;

Now we can use it with an integer:

SELECT * FROM vehicle_tracking_refined_t WHERE vehicleId = 42;

Demo 6 - Using Kafka Connect to write data to Redis and Minio

Alt Image Text

Using Kafka Connect to write to Redis

For transporting messages from Reis to Kafka, in this workshop we will be using Kafka Connect. We could also use StreamSets or Apache NiFi to achieve the same result.

Luckily, there are multiple Kafka Sink Connectors available for writing from Redis. We can either use the one provided by Confluent Inc. (which is part of Confluent Enterprise but available as evaluation license on Confluent Hub) or the one provided by Redis Inc., which is free to use Redis Kafka Connector (Source and Sink) by Redis available on GitHub. We will be using the later one here. It is already installed as part of the platform.

Configure the Redis Sink Connector

For creating an instance of the connector over the API, you can either use a REST client or the Linux curl command line utility, which should be available on the Docker host. Curl is what we are going to use here.

Create a folder scripts (if it does not yet exist) and navigate into the folder.

mkdir scripts
cd scripts

In the scripts folder, create a file start-redis.sh and add the code below.

#!/bin/bash

echo "removing Redis Sink Connector"

curl -X "DELETE" "http://dataplatform:8083/connectors/redis-sink"

echo "creating Redis Sink Connector"

curl -X PUT \
  http://${DOCKER_HOST_IP}:8083/connectors/redis-sink/config \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
    "connector.class": "com.redis.kafka.connect.RedisSinkConnector",
    "tasks.max": "1",
    "redis.uri": "redis://redis-1:6379",
    "redis.insecure": "true",
    "redis.password": "abc123!",
    "redis.command": "HSET",
    "topics": "vehicle_tracking_refined",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-1:8081",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}'

The script first removes the Redis connector, if it does exists already and then creates it from scratch.

Also create a separate script stop-redis.sh for stopping the connector with the following script:

#!/bin/bash

echo "removing Redis Sink Connector"

curl -X "DELETE" "http://${DOCKER_HOST_IP}:8083/connectors/redis-sink"

Make sure that the both scripts are executable

sudo chmod +x start-redis.sh
sudo chmod +x stop-redis.sh

Start the Redis connector

Finally let's start the connector by running the start-redis script.

./scripts/start-redis.sh

The connector will receive the messages from the Kafka topic and write it to Redis, using the Kafka key as the key in Redis. We have opted for using the Hash data structure for the value ("redis.command": "HSET").

Let's see if we have values in Redis by using the Redis CLI

docker exec -ti redis-1 redis-cli

let's authenticate and then use the KEYS * command to get all keys

AUTH abc123!
KEYS *

We can see a key for each vehicle.

127.0.0.1:6379> KEYS *
 1) "vehicle_tracking_refined:\"46\""
 2) "vehicle_tracking_refined:\"18\""
 3) "vehicle_tracking_refined:\"25\""
 4) "vehicle_tracking_refined:80"
 5) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.3"
 6) "vehicle_tracking_refined:\"11\""
 7) "vehicle_tracking_refined:\"16\""
 8) "vehicle_tracking_refined:\"29\""
 9) "vehicle_tracking_refined:\"45\""
10) "vehicle_tracking_refined:\"38\""
11) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.7"
12) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.0"
13) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.4"
14) "vehicle_tracking_refined:97"
15) "vehicle_tracking_refined:69"
16) "vehicle_tracking_refined:74"
17) "vehicle_tracking_refined:\"30\""
18) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.1"
19) "vehicle_tracking_refined:\"40\""
20) "vehicle_tracking_refined:51"
21) "vehicle_tracking_refined:\"14\""
22) "vehicle_tracking_refined:\"43\""
23) "vehicle_tracking_refined:90"
24) "vehicle_tracking_refined:\"20\""
25) "vehicle_tracking_refined:\"26\""
26) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.6"
27) "com.redis.kafka.connect.sink.offset.vehicle_tracking_refined.5"
28) "vehicle_tracking_refined:\"33\""
29) "vehicle_tracking_refined:85"
30) "vehicle_tracking_refined:\"10\""
31) "vehicle_tracking_refined:60"
32) "vehicle_tracking_refined:\"35\""
33) "vehicle_tracking_refined:53"

Let's see the content for vehicle 53 using the HGETALL command

127.0.0.1:6379> HGETALL "vehicle_tracking_refined:53"
 1) "SOURCE"
 2) "Tracking_SysB"
 3) "TIMESTAMP"
 4) "1688321889262"
 5) "VEHICLEID"
 6) "53"
 7) "DRIVERID"
 8) "20"
 9) "ROUTEID"
10) "1090292248"
11) "EVENTTYPE"
12) "Unsafe following distance"
13) "LATITUDE"
14) "40.7"
15) "LONGITUDE"
16) "-89.52"
17) "CORRELATIONID"
18) "5823429444287523"

Using Kafka Connect to write to Object Storage (MinIO)

The connector in Kafka Connect to work with S3 compliant object storage is the Confluent Kafka Connect S3. It is part of the data platform.

curl -X "GET" "$DOCKER_HOST_IP:8083/connector-plugins" | jq

So all we have to do is create another script with the REST call to setup the connector.

In the scripts folder, create a file start-s3.sh and add the code below.

#!/bin/bash

echo "removing S3 Sink Connector"

curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/s3-sink"

echo "creating Confluent S3 Sink Connector"

curl -X PUT \
  http://${DOCKER_HOST_IP}:8083/connectors/s3-sink2/config \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "partition.duration.ms": "3600000",
      "flush.size": "2000",
      "topics": "vehicle_tracking_refined",
      "tasks.max": "1",
      "timezone": "Europe/Zurich",
      "locale": "en",
      "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
      "timestamp.extractor":"RecordField",
      "timestamp.field":"TIMESTAMP",
      "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
      "s3.region": "us-east-1",
      "s3.bucket.name": "logistics-bucket",
      "s3.part.size": "5242880",
      "store.url": "http://minio-1:9000",
      "topics.dir": "refined",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry-1:8081",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}'

We configure the connector to read the topic vehicle_tracking_refined and write messages to the bucket named logistics-bucket.

Also create a separate script stop-s3.sh for just stopping the connector and add the following code:

#!/bin/bash

echo "removing S3 Sink Connector"

curl -X "DELETE" "$DOCKER_HOST_IP:8083/connectors/s3-sink"

Make sure that the both scripts are executable

sudo chmod +x start-s3.sh
sudo chmod +x stop-s3.sh

Create the Bucket in Object Storage

Before we can start the script, we have to make sure that the bucket logistics-bucket exists in Object Storage.

docker exec -ti minio-mc mc mb minio-1/logistics-bucket

Start the S3 connector

Finally let's start the connector by running the start-s3 script.

./scripts/start-s3.sh

You have to make sure that either the ingestion into vehicle_tracking_refined is still working or that you have existing messages in the topic vehicle_tracking_refined.

As soon as the connector picks up messages, they should start to appear in the logistics-bucket bucket in MiniIO.

You should see a new folder topics with a sub-folder vehicle_tracking_refined representing the topic and inside this folder there is another folder per partition.

Alt Image Text

In each folder you will find multiple objects, all with some messages from Kafka.

Alt Image Text

Let's see the content of one of the objects. We cannot do that directly from the MinIO UI, we have to first download it and then use a local editor. To download an object, select the object and then click on the Download action to the right.

The content of the object should be similar to the one shown below

Downloads % cat vehicle_tracking_refined+0+0000000000.avro
Objavro.schemaďż˝
               {"type":"record","name":"VehicleTrackingRefined","namespace":"com.trivadis.avro","fields":[{"name":"SOURCE","type":["null","string"],"default":null},{"name":"TIMESTAMP","type":["null","long"],"default":null},{"name":"VEHICLEID","type":["null","long"],"default":null},{"name":"DRIVERID","type":["null","long"],"default":null},{"name":"ROUTEID","type":["null","long"],"default":null},{"name":"EVENTTYPE","type":["null","string"],"default":null},{"name":"LATITUDE","type":["null","double"],"default":null},{"name":"LONGITUDE","type":["null","double"],"default":null},{"name":"CORRELATIONID","type":["null","string"],"default":null}],"connect.version":2,"connect.name":"com.trivadis.avro.VehicleTrackingRefined"}avro.codenull�O;O��f,Φ/[+���|Tracking_SysA����bP*����
                                                       Normal�G�z�D@{�G�V�&6906439778495426077Tracking_SysA����bP*����
                                                                                                                     Normal�z�GaD@�z�G1V�&6906439778495426077Tracking_SysA����bP*����
Normalq=
ף0D@{�G�JV�&6906439778495426077Tracking_SysA����bP*����
                                                      Normal�Q���C@���QhV�&6906439778495426077Tracking_SysA����bP*����
                                                                                                                     Normal����̌C@���(\oV�&6906439778495426077Tracking_SysA����bP*����
Normal33333SC@����̌V�&6906439778495426077Tracking_SysA���bP*����
ףp�D@���G�z�V��&6906439778495426077��Tracking_SysA������b�P�*�������
...

Step 7 - Investigate Driving behaviour

In this part we will be using ksqlDB and as an alternative solution Kafka Streams or Faust to analyse the data in the refined topic vehicle_tracking_refined.

Alt Image Text

Now with the data from both system integrated, let's work with it!

In this new stream we are only interested in the messages where the eventType is not normal. First let's create a SELECT statement which performs the right result, using the ksqlDB CLI:

SELECT * FROM vehicle_tracking_refined_s 
WHERE eventType != 'Normal'
EMIT CHANGES;

Now let's create a new stream with that information.

DROP STREAM IF EXISTS problematic_driving_s;

CREATE STREAM IF NOT EXISTS problematic_driving_s
  WITH (kafka_topic='problematic_driving',
        value_format='AVRO',
        partitions=8)
AS 
SELECT * 
FROM vehicle_tracking_refined_s
WHERE eventtype != 'Normal';

We can see that the stream now only contains the messages filtered down to the relevant ones:

SELECT * FROM problematic_driving_s
EMIT CHANGES;

We can also see the same information by directly getting the data from the underlaying kafka topic problematic_driving:

docker exec -ti kcat kcat -b kafka-1 -t problematic_driving -s value=avro -r http://schema-registry-1:8081

Alternative using Kafka Streams

The same logic can also be implemented using Kafka Streams. In the folder java you will find the Kafka Streams project kafka-streams-vehicle-tracking with the implementation. The value we consume from the vehicle_tracking_refined topic is serialized as Avro. Therefore we configure Kafka Streams to use the SpecificAvroSerde.

package com.trivadis.kafkastreams;

import com.trivadis.avro.VehicleTrackingRefined;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class DetectProblematicDriving {

	static final String VEHICLE_TRACKING_REFINED_STREAM = "vehicle_tracking_refined";
	static final String PROBLEMATIC_DRIVING_STREAM = "problematic_driving-kstreams";

	public static void main(final String[] args) {
		final String applicationId = "test";
		final String clientId = "test";
		final String bootstrapServer = "dataplatform:9092";
		final String schemaRegistryUrl = "http://dataplatform:8081";
		final boolean cleanup = false;
		final String stateDirPath = "C:\\tmp\\kafka-streams";

		final KafkaStreams streams = buildFeed(applicationId, clientId, bootstrapServer, schemaRegistryUrl, stateDirPath);

		if (cleanup) {
			streams.cleanUp();
		}
		streams.start();

		// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
			@Override
			public void run() {
				streams.close();
			}
		}));
	}

	private static KafkaStreams buildFeed(final String applicationId, final String clientId, final String bootstrapServers, final String schemaRegistryUrl,
										  final String stateDir) {

		final Properties streamsConfiguration = new Properties();

		// Give the Streams application a unique name. The name must be unique in the
		// Kafka cluster
		// against which the application is run.
		streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
		streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);

		// Where to find Kafka broker(s).
		streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

		// Where to find the Confluent schema registry instance(s)
		streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

		// Specify default (de)serializers for record keys and for record values.
		streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
		streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
		streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		// Records should be flushed every 10 seconds. This is less than the default
		// in order to keep this example interactive.
		streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

		// If Confluent monitoring interceptors are on the classpath,
		// then the producer and consumer interceptors are added to the
		// streams application.
		// MonitoringInterceptorUtils.maybeConfigureInterceptorsStreams(streamsConfiguration);


		final StreamsBuilder builder = new StreamsBuilder();

		// read the source stream (keyed by objectId)
		final KStream<String, VehicleTrackingRefined> vehicleTracking = builder.stream(VEHICLE_TRACKING_REFINED_STREAM);

		vehicleTracking.peek((k,v) -> System.out.println("vehicleTracking.peek(...) : " + k + " : " + v));

		// filter out all events where eventType equals "Normal"
		final KStream<String, VehicleTrackingRefined> vehicleTrackingFiltered = vehicleTracking.filterNot((k,v) -> "Normal".equals (v.getEVENTTYPE().toString()));

		// Send the Matches to the Kafka Topic
		vehicleTrackingFiltered.to(PROBLEMATIC_DRIVING_STREAM);

		// read the driver
		//final KTable<String, Driver> driver = builder.table(DRIVER_STREAM);

		// Left Join Positions Mecomo Raw with Barge to get the barge id
		//KStream<String, PositionMecomo> positionsMecomo  =  positionsMecomoRaw.leftJoin(barge,
		//		(leftValue, rightValue) -> createFrom(leftValue, (rightValue != null ? rightValue.getId() : -1) ),
		//		Joined.<String, PositionMecomoRaw, Barge>keySerde(Serdes.String())
		//);

		return new KafkaStreams(builder.build(), streamsConfiguration);
	}
}

To not give any conflicts with the ksqlDB version, the Kafka Streams implementation publishes to its own topic problematic_driving-kstreams. So let's create that first

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic problematic_driving-kstreams --partitions 8 --replication-factor 3

Now you can run the Kafka Streams application and you should see the problematic driving in the problematic_driving-kstreams topic

docker exec -ti kcat kcat -b kafka-1 -t problematic_driving-kstreams -s value=avro -r http://schema-registry-1:8081 -o end -q

Alternative using Faust

The same logic can also be implemented using Faust. In the folder python you will find the Faust project faust-vehicle-tracking with the implementation. The value we have availabke from the vehicle_tracking_refined topic is serialized as Avro. Avro is supported by Faut, but the current implementation works on Json. Therefore we first convert the Avro messges into Json using ksqlDB.

CREATE STREAM vehicle_tracking_refined_json_s
  WITH (kafka_topic='vehicle_tracking_refined_json',
        value_format='JSON', 
        partitions=8, replicas=3)
AS 
SELECT * 
FROM vehicle_tracking_refined_s
EMIT CHANGES;        

You can install Faust either via the Python Package Index (PyPI) or from source.

pip install -U faust

Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature. Fine more about it here.

In your home directory, create a folder faust-vehicle-tracking and navigate into the folder

cd
mkdir -p faust-vehicle-tracking/src
cd faust-vehicle-tracking/src

Create a file __main__.py and add the following code

from src.app import app

app.main()

Create a file app.py and add the following code

import faust

kafka_brokers = ['dataplatform:29092']

# convenience func for launching the app
def main() -> None:
    app.main()

app = faust.App('vehicle-tracking-app', broker=kafka_brokers)

# GameEvent Schema
class VehiclePosition(faust.Record, validation=True, serializer='json'):
    TIMESTAMP: str
    VEHICLEID: str
    DRIVERID: int
    ROUTEID: int
    EVENTTYPE: str
    LATITUDE: float
    LONGITUDE: float


rawVehiclePositionTopic = app.topic('vehicle_tracking_refined_json', value_type= VehiclePosition)
problematicDrivingTopic = app.topic('problematic_driving_faust', value_type= VehiclePosition)


@app.agent(rawVehiclePositionTopic)
async def process(positions):
    async for position in positions:
        print(f'Position for {position. VEHICLEID}')
        
        if position.EVENTTYPE != 'Normal': 
            await problematicDrivingTopic.send(value=position)   

Create the new topic problematic_driving_faust where the dangerous drving behaviour will be published to:

docker exec -ti kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic problematic_driving_faust --partitions 8 --replication-factor 3

Now you can run the Faust application the application. From the src folder run

cd ..
python3 -m src worker -l info

and you should see the problematic driving in the problematic_driving-kstreams topic

docker exec -ti kcat kcat -b kafka-1 -t problematic_driving_faust -o end -q

Demo 8 - Materialize Driver Information ("static information")

In this part of the workshop, we are integrating the driver information from the Dispatching system into a Kafka topic, so it is available for enrichments of data streams.

Alt Image Text

We will use the Kafka Connect JDBC Source Connector for periodically retrieving the data from the database table and publish it to the Kafka topic logisticsdb_driver. The connector is pre-installed as part of the dataplatform.

Instead of configuring the connector through the REST API, as we have seen before with the MQTT connector, we will use the ksqlDB integration with the CREATE CONNECTOR command.

First let's create the Kafka topic logisticsdb_driver.

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic logisticsdb_driver --partitions 8 --replication-factor 3 --config cleanup.policy=compact --config segment.ms=100 --config delete.retention.ms=100 --config min.cleanable.dirty.ratio=0.001

Now in the ksqlDB create the connector (CREATE CONNECTOR)

DROP CONNECTOR jdbc_logistics_sc;
CREATE SOURCE CONNECTOR jdbc_logistics_sc WITH (
    "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',
    "tasks.max" = '1',
    "connection.url" = 'jdbc:postgresql://postgresql/demodb?user=demo&password=abc123!',
    "mode" = 'timestamp',
    "timestamp.column.name" = 'last_update',
    "schema.pattern" = 'dispatching',
    "table.whitelist" = 'driver',
    "validate.non.null" = 'false',
    "topic.prefix" = 'logisticsdb_',
    "poll.interval.ms" = '10000',
    "key.converter" = 'org.apache.kafka.connect.converters.LongConverter',
    "key.converter.schemas.enable" = 'false',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'false',
    "transforms" = 'createKey,extractInt',
    "transforms.createKey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createKey.fields" = 'id',
    "transforms.extractInt.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
    "transforms.extractInt.field" = 'id'
    );

we can see that all the drivers from the driver table have been produced into the logisticsdb_driver topic by using kcat:

docker exec -ti kcat kcat -b kafka-1 -t logisticsdb_driver -o beginning

you can also use the PRINT command from ksqlDB instead

PRINT logisticsdb_driver FROM BEGINNING;

back in the ksqlDB console, create a ksqlDB table on the topic

DROP TABLE IF EXISTS driver_t;

CREATE TABLE IF NOT EXISTS driver_t (id BIGINT PRIMARY KEY,
   first_name VARCHAR,  
   last_name VARCHAR,  
   available VARCHAR, 
   birthdate VARCHAR)  
  WITH (kafka_topic='logisticsdb_driver', 
        value_format='JSON');
SELECT * FROM driver_t EMIT CHANGES;

Now perform an update on one of the drivers in the PostgreSQL database (original source):

docker exec -ti postgresql psql -d demodb -U demo -c "UPDATE logistics_db.driver SET available = 'N', last_update = CURRENT_TIMESTAMP  WHERE id = 11"

Demo 9 - Join with Driver ("static information")

In this part of the workshop, we are joining the driver ksqlDB table with the problematic_driving_s ksqlDB stream to enrich it with valuable information.

Alt Image Text

Now with the ksqlDB table in place, let's join it with the problematic_driving_s ksqlDB stream to enrich it with driver information available in the driver_t table (first_name, last_name and availability):

SELECT pd.driverId, d.first_name, d.last_name, d.available, pd.vehicleId, pd.routeId, pd.eventType 
FROM problematic_driving_s 	pd
LEFT JOIN driver_t 				d
ON pd.driverId  = d.id
EMIT CHANGES;

We can see that the join looks like it has been taken from an RDMBS-based system. The enriched stream can be seen appearing in live on the ksqlDB CLI.

How can we make that enriched dataset (data stream) available in a more permanent fashion? We do that by creating a new Stream based on the SELECT statement just issued. Stop the query by entering CTRL-C and execute the following statement:

DROP STREAM IF EXISTS problematic_driving_and_driver_s;
CREATE STREAM IF NOT EXISTS problematic_driving_and_driver_s \
  WITH (kafka_topic='problematic_driving_and_driver', \
        value_format='AVRO', \
        partitions=8) \
AS 
SELECT pd.driverId, d.first_name, d.last_name, d.available, pd.vehicleId, pd.routeId, pd.eventType 
FROM problematic_driving_s 	pd
LEFT JOIN driver_t 				d
ON pd.driverId  = d.id;

we can use kcat to show the data stream in the newly created Kafka topic problematic_driving_and_driver_ksql to show the enrichment in action:

docker exec -ti kcat kcat -b kafka-1 -t problematic_driving_and_driver -s value=avro -r http://schema-registry-1:8081

Demo 10 - Aggregate Driving Behaviour

In this part of the workshop, we are using the aggregate operators count to perform aggregations over time windows.

Alt Image Text

The first one is a tumbling window of 1 hour

DROP TABLE IF EXISTS event_type_by_1hour_tumbl_t DELETE TOPIC;
CREATE TABLE event_type_by_1hour_tumbl_t AS
SELECT windowstart AS winstart
	, windowend 	AS winend
	, eventType
	, count(*) 	AS nof 
FROM problematic_driving_s 
WINDOW TUMBLING (SIZE 60 minutes)
GROUP BY eventType
EMIT CHANGES;

The second one is a tumbling window of 1 hour with a slide of 30 minutes.

CREATE TABLE event_type_by_1hour_hopp_t AS
SELECT windowstart AS winstart
	, windowend 	AS winend
	, eventType
	, count(*) 	AS nof 
FROM problematic_driving_s 
WINDOW HOPPING (SIZE 60 minutes, ADVANCE BY 30 minutes)
GROUP BY eventType;

If you are doing a select on the table, you can format the time elements of the time window as shown below

SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:SS','CET') wsf
, TIMESTAMPTOSTRING(WINDOWEND,'yyyy-MM-dd HH:mm:SS','CET') wef
, WINDOWSTART
, WINDOWEND
, eventType
, nof
FROM event_type_by_1hour_tumbl_t
EMIT CHANGES;

Demo 11 - Materialize Shipment Information ("static information")

In this part of the workshop we are integrating the shipment information from the Shipment system into a Kafka topic, so it is available for anayltics.

Alt Image Text

We will use the Kafka Connect Debezium MySQL CDC Source Connector to monitor and record all row-level changes in on the shipment database table and publish it to the Kafka topic sample.sample.shipment (implementation of the log-based change data capture). The connector is pre-installed as part of the dataplatform.

We are again using the CREATE CONNECTOR command for configuring the connector instead of the REST API.

First let's create the new Kafka topic

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic sample.sample.shipment --partitions 8 --replication-factor 3

alternatively I could also be created as a compacted log topic

docker exec -it kafka-1 kafka-topics --bootstrap-server kafka-1:19092 --create --topic sample.sample.shipment --partitions 8 --replication-factor 3 --config cleanup.policy=compact --config segment.ms=100 --config delete.retention.ms=100 --config min.cleanable.dirty.ratio=0.001

Now we can create the connector

DROP CONNECTOR debz_shipment_sc;

CREATE SOURCE CONNECTOR debz_shipment_sc WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'mysql',
    'database.port' = '3306',
    'database.user' = 'debezium',
    'database.password' = 'dbz',
    'database.server.id' = '42',
    'database.server.name' = 'sample',
    'table.whitelist' = 'sample.shipment',
    'database.history.kafka.bootstrap.servers' = 'kafka-1:19092',
    'database.history.kafka.topic' = 'dbhistory.sample' ,
    'schema_only_recovery' = 'true',
    'include.schema.changes' = 'false',
    'transforms'= 'unwrap, extractkey',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field'= 'id',
    'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry-1:8081'
    );

Now let's create the corresponding ksqlDB table

DROP TABLE IF EXISTS shipment_t;

CREATE TABLE IF NOT EXISTS shipment_t (id VARCHAR PRIMARY KEY,
   vehicle_id INTEGER,  
   target_wkt VARCHAR)  
  WITH (kafka_topic='sample.sample.shipment', 
        value_format='AVRO');

Check with MySQL that CDC works

docker exec -it mysql bash -c 'mysql -u root -pmanager'

And use a select to test that it is working

SELECT * FROM shipment_t EMIT CHANGES;

If you perform an INSERT in MySQL, you should see a change immediately

USE sample;

INSERT INTO shipment (id, vehicle_id, target_wkt)  VALUES (9, 49, 'POLYGON ((-91.29638671875 39.04478604850143, -91.4501953125 38.46219172306828, -90.98876953125 37.94419750075404, -89.912109375 37.78808138412046, -88.9892578125 38.37611542403604, -88.92333984375 38.77121637244273, -89.71435546875 39.470125122358176, -90.19775390625 39.825413103424786, -91.29638671875 39.04478604850143))'); 

Demo 12 - Geo-Fencing for "near" destination (to be finished)

In this part of the workshop we are using the shipment information to detect when a vehicle is near the destination of the shipment.

Alt Image Text

For that we have implemented some additional user defined functions (UDFs) which can be sue din the same way as the built-in funcitons of ksqlDB.

show functions;
DROP TABLE IF EXISTS shipment_by_vehicle_t;

CREATE TABLE shipment_by_vehicle_t
AS SELECT vehicle_id, collect_list(target_wkt) AS target_wkts
FROM shipment_t
GROUP BY vehicle_id;
SELECT vtr.vehicleId
		,array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),1) AS status_before
		,array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),0) AS status_now
FROM vehicle_tracking_refined_s	vtr
LEFT JOIN shipment_by_vehicle_t	sbv
ON CAST (vtr.vehicleId AS INTEGER) = sbv.vehicle_id
WHERE sbv.target_wkts IS NOT NULL
GROUP BY vehicleId
EMIT CHANGES;
CREATE TABLE geo_fence_status_t AS
SELECT vtr.vehicleId
		, geo_fence (array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),1) ,
					array_lag(collect_list(geo_fence(vtr.latitude, vtr.longitude, sbv.target_wkts[1])),0) 
					) AS status
FROM vehicle_tracking_refined_s	vtr
LEFT JOIN shipment_by_vehicle_t	sbv
ON CAST (vtr.vehicleId AS INTEGER) = sbv.vehicle_id
WHERE sbv.target_wkts IS NOT NULL
GROUP BY vehicleId
EMIT CHANGES;
SELECT vehicleId, geo_fence(array_lag(collect_list(geo_fence(latitude, longitude, 'POLYGON ((-90.626220703125 38.80118939192329, -90.62347412109375 38.460041065720446, -90.06866455078125 38.436379603, -90.04669189453125 38.792626957868904, -90.626220703125 38.80118939192329))')),1), array_lag(collect_list(geo_fence(latitude, longitude, 'POLYGON ((-90.626220703125 38.80118939192329, -90.62347412109375 38.460041065720446, -90.06866455078125 38.436379603, -90.04669189453125 38.792626957868904, -90.626220703125 38.80118939192329))')),0)) status FROM vehicle_tracking_refined_s group by vehicleId EMIT CHANGES;

Demo 13 - Dashboard Integration (to be finished)

Alt Image Text

First let's create a stream backed by the dashboard topic, which will be the channel to the Tipboard dashboard solution.

DROP STREAM IF EXISTS dashboard_s;
CREATE STREAM IF NOT EXISTS dashboard_s
  (ROWKEY BIGINT KEY,
   tile VARCHAR,
   key VARCHAR, 
   data VARCHAR)
  WITH (kafka_topic='dashboard'
  		, partitions=1
       , value_format='JSON');

Now import the StreamSets connector between this new stream and the Tipboard dashboard.

Problematic Drivers

SELECT first_name, last_name, eventType
FROM problematic_driving_and_driver_s
EMIT CHANGES;
CREATE STREAM tipboard_text_s 
WITH (value_format = 'JSON', kafka_topic = 'tipboard_text_s', partitions=1)
AS
SELECT driverId AS ROWKEY
	   , 'text' AS tile
		, 'tweet' AS key
		, tipboard_text(concat(first_name, ' ', last_name, ' ', eventType)) AS data
FROM problematic_driving_and_driver_s
EMIT CHANGES;

Geo Fence

DROP STREAM geo_fence_status_s;

CREATE STREAM geo_fence_status_s (vehicleId STRING KEY
												, status STRING)
WITH (kafka_topic='GEO_FENCE_STATUS_T'
					, partitions=8
					, value_format='AVRO');
INSERT INTO dashboard_s
SELECT CAST (vehicleId AS BIGINT) AS ROWKEY
		, 'text' AS tile
		, 'tweet' AS key
		, tipboard_text(concat('Vehicle ', vehicleId, ' is near its destination')) AS data
FROM geo_fence_status_s
WHERE status = 'ENTERING'
PARTITION BY CAST (vehicleId AS BIGINT)
EMIT CHANGES;

Pie Chart

DROP STREAM event_type_by_1hour_tumbl_s;

CREATE STREAM event_type_by_1hour_tumbl_s (eventType STRING KEY
												, winstart BIGINT
												, winend BIGINT
												, nof BIGINT)
WITH (kafka_topic='event_type_by_1hour_tumbl_t'
					, partitions=8
					, value_format='AVRO'
					, window_type='Tumbling'
					, window_size='60 minutes');

SELECT winstart
		, collect_list(eventType) 
		, collect_list(nof) 
FROM  event_type_by_1hour_tumbl_s 
GROUP BY winstart
EMIT CHANGES;

SELECT winstart, as_map(collect_list(eventType), collect_list(nof) ) as counts
FROM  event_type_by_1hour_tumbl_s 
GROUP BY winstart
EMIT CHANGES;

DROP TABLE tipboard_pie_t DELETE TOPIC;

CREATE TABLE tipboard_pie_t 
WITH (value_format = 'JSON', kafka_topic = 'tipboard_pie_t', partitions=1)
AS
SELECT winstart
		, 'pie_chart' AS tile
		, 'pie' AS key
		, tipboard_pie_chart('Last Hour', as_map(collect_list(eventType), collect_list(nof) )) as data
FROM  event_type_by_1hour_tumbl_s 
GROUP BY winstart
EMIT CHANGES;