Development of streaming event pipeline around Apache Kafka using public data from the Chicago Transit Authority. The event pipeline simulates and displays the status of train lines in real time.
Monitor website to watch trains move from station to station.
- Docker
- Docker compose used for Confluent Kafka images and postgresql 11
- Python 3.7
- use of confluent, faust, Avro, and various custom classes
The Chicago Transit Authority (CTA) required a dashboard displaying system status for its commuters. Kafka's ecosystem with tools like REST Proxy and Kafka Connect were explored to achieve this task.
Architecture will look like so:
Configure the train stations to emit the events. Sensor data simulates an action whenever a train arrives at the station.
producers/models/producer.py
- Define a
value
schema for the arrival event inproducers/models/schemas/arrival_value.json
with the following attributesstation_id
train_id
direction
line
train_status
prev_station_id
prev_direction
producers/models/station.py
:- A topic is created for each station in Kafka to track the arrival events
- The station emits an
arrival
event to Kafka whenever theStation.run()
function is called. - Ensure that events emitted to kafka are paired with the Avro
key
andvalue
schemas
- value
schema for the turnstile event in
producers/models/schemas/turnstile_value.json` with attributesstation_id
station_name
line
producers/models/turnstile.py
:- A topic is created for each turnstile for each station in Kafka to track the turnstile events
- The station emits a
turnstile
event to Kafka whenever theTurnstile.run()
function is called. - events emitted with the Avro
key
andvalue
schemas
Our partners at the CTA have asked to send weather readings into Kafka from their weather hardware. Unfortunately, this hardware is old and we cannot use the Python Client Library due to hardware restrictions. Instead, the HTTP REST can send the data to Kafka from the hardware using Kafka's REST Proxy.
value
schema for the weather event inproducers/models/schemas/weather_value.json
with attributestemperature
status
producers/models/weather.py
:- A topic is created for weather events
- The weather model emits
weather
event to Kafka REST Proxy whenever theWeather.run()
function is called.- NOTE: HTTP requests to Kafka REST Proxy need to include the correct
Content-Type
.
- NOTE: HTTP requests to Kafka REST Proxy need to include the correct
- Events emitted to REST Proxy are paired with the Avro
key
andvalue
schemas
Extract station information from PostgreSQL database into Kafka using Kafka connect JDBC source connector
producers/connectors.py
- Kafka Connect JDBC Source Connector Configuration Options
- use the Landoop Kafka Connect UI and Landoop Kafka Topics UI to check the status and output of the Connector.
- To delete a misconfigured connector:
CURL -X DELETE localhost:8083/connectors/stations
Faust Stream Processing to transform the raw Stations table from Kafka Connect. The raw format from the database has superfluous data and the line color information is not conveniently configured. Need to ingest data from our Kafka Connect topic, and transform the data.
consumers/faust_stream.py
- run this Faust processing application with the following command:
faust -A faust_stream worker -l info
- On Pycharm use
worker
under Parameters
Aggregate turnstile data for each of our stations. Need a count of tunstile events in a streaming format. Useful to summarize by station for an up-to-date count
consumers/ksql.py
- The KSQL CLI
ksql
python ksql.py
DROP TABLE <your_table>
. CLI terminate a running query,TERMINATE <query_name>
Consume the data in the web server to serve the transit status pages to commuters.
consumers/consumer.py
consumers/models/line.py
consumers/models/weather.py
consumers/models/station.py
- Confluent Python Client Documentation
- Confluent Python Client Usage and Examples
- REST Proxy API Reference
- Kafka Connect JDBC Source Connector Configuration Options
The project consists of two main directories, producers
and consumers
.
The following directory layout indicates the files
├── consumers
│ ├── consumer.py *
│ ├── faust_stream.py *
│ ├── ksql.py *
│ ├── models
│ │ ├── lines.py
│ │ ├── line.py *
│ │ ├── station.py *
│ │ └── weather.py *
│ ├── requirements.txt
│ ├── server.py
│ ├── topic_check.py
│ └── templates
│ └── status.html
└── producers
├── connector.py *
├── models
│ ├── line.py
│ ├── producer.py *
│ ├── schemas
│ │ ├── arrival_key.json
│ │ ├── arrival_value.json *
│ │ ├── turnstile_key.json
│ │ ├── turnstile_value.json *
│ │ ├── weather_key.json
│ │ └── weather_value.json *
│ ├── station.py *
│ ├── train.py
│ ├── turnstile.py *
│ ├── turnstile_hardware.py
│ └── weather.py *
├── requirements.txt
└── simulation.py
%> docker-compose up
Service | Host URL | Docker URL | Username | Password |
---|---|---|---|---|
Public Transit Status | http://localhost:8888 | n/a | ||
Landoop Kafka Connect UI | http://localhost:8084 | http://connect-ui:8084 | ||
Landoop Kafka Topics UI | http://localhost:8085 | http://topics-ui:8085 | ||
Landoop Schema Registry UI | http://localhost:8086 | http://schema-registry-ui:8086 | ||
Kafka | PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 | PLAINTEXT://kafka0:9092,PLAINTEXT://kafka1:9093,PLAINTEXT://kafka2:9094 | ||
REST Proxy | http://localhost:8082 | http://rest-proxy:8082/ | ||
Schema Registry | http://localhost:8081 | http://schema-registry:8081/ | ||
Kafka Connect | http://localhost:8083 | http://kafka-connect:8083 | ||
KSQL | http://localhost:8088 | http://ksql:8088 | ||
PostgreSQL | jdbc:postgresql://localhost:5432/cta |
jdbc:postgresql://postgres:5432/cta |
cta_admin |
chicago |
access to services from local machine, will use the Host URL
column.
configuring services that run within Docker Compose, like Kafka Connect you must use the Docker URL. When you configure the JDBC Source Kafka Connector, for example, use the value from the Docker URL
column.
Pieces to the simulation, the producer
and consumer
. In development only run one piece of the project at a time.
cd producers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python simulation.py
Once the simulation is running, you may hit Ctrl+C
at any time to exit.
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
faust -A faust_stream worker -l info
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python ksql.py
** NOTE **: Do not run the consumer until you have reached Step 6!
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python server.py
Once the server is running, you may hit Ctrl+C
at any time to exit.