Documentation π Hands-on Tutorials π― RisingWave Cloud π Get Instant Help
In this hands-on workshop, weβll learn how to process real-time streaming data using SQL in RisingWave. The system weβll use is RisingWave, an open-source SQL database for processing and managing streaming data. You may not feel unfamiliar with RisingWaveβs user experience, as itβs fully wire compatible with PostgreSQL.
We will use the NYC Taxi dataset, which contains information about taxi trips in New York City.
Weβll cover the following topics in this Workshop:
- Why Stream Processing?
- Stateless computation (Filters, Projections)
- Stateful Computation (Aggregations, Joins)
- Data Ingestion and Delivery
RisingWave in 10 Minutes: https://tutorials.risingwave.com/docs/intro
- Docker and Docker Compose
- Python 3.7 or later
pip
andvirtualenv
for Pythonpsql
(I use PostgreSQL-14.9)- Clone this repository:
Or, if you prefer HTTPS:
git clone git@github.com:risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git cd risingwave-data-talks-workshop-2024-03-04
git clone https://github.com/risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git cd risingwave-data-talks-workshop-2024-03-04
The NYC Taxi dataset is a public dataset that contains information about taxi trips in New York City. The dataset is available in Parquet format and can be downloaded from the NYC Taxi & Limousine Commission website.
We will be using the following files from the dataset:
yellow_tripdata_2022-01.parquet
taxi_zone.csv
For your convenience, these have already been downloaded and are available in the data
directory.
The file seed_kafka.py
contains the logic to process the data and populate RisingWave.
In this workshop, we will replace the timestamp
fields in the trip_data
with timestamp
s close to the current time.
That's because yellow_tripdata_2022-01.parquet
contains historical data from 2022,
and we want to simulate processing real-time data.
$ tree -L 1
.
βββ README.md # This file
βββ clickhouse-sql # SQL scripts for Clickhouse
βββ commands.sh # Commands to operate the cluster
βββ data # Data files (trip_data, taxi_zone)
βββ docker # Contains docker compose files
βββ requirements.txt # Python dependencies
βββ risingwave-sql # SQL scripts for RisingWave (includes some homework files)
βββ seed_kafka.py # Python script to seed Kafka
Before getting your hands dirty with the project, we will:
- Run some diagnostics.
- Start the RisingWave cluster.
- Setup our python environment.
# Check version of psql
psql --version
source commands.sh
# Start the RW cluster
start-cluster
# Setup python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
commands.sh
contains several commands to operate the cluster. You may reference it to see what commands are available.
Now you're ready to start on the workshop!
CREATE MATERIALIZED VIEW latest_1min_trip_data AS SELECT taxi_zone.Zone as pickup_zone, taxi_zone_1.Zone as dropoff_zone, tpep_pickup_datetime, tpep_dropoff_datetime FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id JOIN taxi_zone as taxi_zone_1 ON trip_data.DOLocationID = taxi_zone_1.location_id WHERE tpep_dropoff_datetime > now() - interval '1 minute';
CREATE MATERIALIZED VIEW total_airport_pickups AS SELECT count(*) AS cnt, taxi_zone.Zone FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Zone LIKE '%Airport' GROUP BY taxi_zone.Zone;
CREATE MATERIALIZED VIEW airport_pu as SELECT tpep_pickup_datetime, pulocationid FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';
CREATE MATERIALIZED VIEW latest_jfk_pickup AS SELECT max(tpep_pickup_datetime) AS latest_pickup_time FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';
CREATE MATERIALIZED VIEW jfk_pickups_1hr_before AS SELECT count(*) AS cnt FROM airport_pu JOIN latest_jfk_pickup ON airport_pu.tpep_pickup_datetime > latest_jfk_pickup.latest_pickup_time - interval '1 hour' JOIN taxi_zone ON airport_pu.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';
CREATE MATERIALIZED VIEW busiest_zones_1_min AS SELECT taxi_zone.Zone AS dropoff_zone, count(*) AS last_1_min_dropoff_cnt FROM trip_data JOIN taxi_zone ON trip_data.DOLocationID = taxi_zone.location_id WHERE trip_data.tpep_dropoff_datetime > (NOW() - INTERVAL '1' MINUTE) GROUP BY taxi_zone.Zone ORDER BY last_1_min_dropoff_cnt DESC LIMIT 10;
CREATE MATERIALIZED VIEW longest_trip_1_min AS SELECT tpep_pickup_datetime, tpep_dropoff_datetime, taxi_zone_pu.Zone as pickup_zone, taxi_zone_do.Zone as dropoff_zone, trip_distance FROM trip_data JOIN taxi_zone as taxi_zone_pu ON trip_data.PULocationID = taxi_zone_pu.location_id JOIN taxi_zone as taxi_zone_do ON trip_data.DOLocationID = taxi_zone_do.location_id WHERE trip_data.tpep_pickup_datetime > (NOW() - INTERVAL '5' MINUTE) ORDER BY trip_distance DESC LIMIT 10;
CREATE MATERIALIZED VIEW avg_fare_amt AS SELECT avg(fare_amount) AS avg_fare_amount_per_min, count(*) AS num_rides_per_min, window_start, window_end FROM TUMBLE(trip_data, tpep_pickup_datetime, INTERVAL '1' MINUTE) GROUP BY window_start, window_end ORDER BY num_rides_per_min ASC;