Documentation   πŸ“‘    Hands-on Tutorials   πŸŽ―    RisingWave Cloud   πŸš€    Get Instant Help

Slack X YouTube

Stream processing in SQL with RisingWave

In this hands-on workshop, we’ll learn how to process real-time streaming data using SQL in RisingWave. The system we’ll use is RisingWave, an open-source SQL database for processing and managing streaming data. You may not feel unfamiliar with RisingWave’s user experience, as it’s fully wire compatible with PostgreSQL.

RisingWave

We will use the NYC Taxi dataset, which contains information about taxi trips in New York City.

We’ll cover the following topics in this Workshop:

  • Why Stream Processing?
  • Stateless computation (Filters, Projections)
  • Stateful Computation (Aggregations, Joins)
  • Data Ingestion and Delivery

RisingWave in 10 Minutes: https://tutorials.risingwave.com/docs/intro

Prerequisites

  1. Docker and Docker Compose
  2. Python 3.7 or later
  3. pip and virtualenv for Python
  4. psql (I use PostgreSQL-14.9)
  5. Clone this repository:
    git clone git@github.com:risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git
    cd risingwave-data-talks-workshop-2024-03-04
    Or, if you prefer HTTPS:
    git clone https://github.com/risingwavelabs/risingwave-data-talks-workshop-2024-03-04.git
    cd risingwave-data-talks-workshop-2024-03-04

Note on the dataset

The NYC Taxi dataset is a public dataset that contains information about taxi trips in New York City. The dataset is available in Parquet format and can be downloaded from the NYC Taxi & Limousine Commission website.

We will be using the following files from the dataset:

  • yellow_tripdata_2022-01.parquet
  • taxi_zone.csv

For your convenience, these have already been downloaded and are available in the data directory.

The file seed_kafka.py contains the logic to process the data and populate RisingWave.

In this workshop, we will replace the timestamp fields in the trip_data with timestamps close to the current time. That's because yellow_tripdata_2022-01.parquet contains historical data from 2022, and we want to simulate processing real-time data.

Project Structure

$ tree -L 1
.
β”œβ”€β”€ README.md                   # This file
β”œβ”€β”€ clickhouse-sql              # SQL scripts for Clickhouse
β”œβ”€β”€ commands.sh                 # Commands to operate the cluster
β”œβ”€β”€ data                        # Data files (trip_data, taxi_zone)
β”œβ”€β”€ docker                      # Contains docker compose files
β”œβ”€β”€ requirements.txt            # Python dependencies
β”œβ”€β”€ risingwave-sql              # SQL scripts for RisingWave (includes some homework files)
└── seed_kafka.py               # Python script to seed Kafka

Getting Started

Before getting your hands dirty with the project, we will:

  1. Run some diagnostics.
  2. Start the RisingWave cluster.
  3. Setup our python environment.
# Check version of psql
psql --version
source commands.sh

# Start the RW cluster
start-cluster

# Setup python
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

commands.sh contains several commands to operate the cluster. You may reference it to see what commands are available.

Now you're ready to start on the workshop!

CREATE MATERIALIZED VIEW latest_1min_trip_data AS SELECT taxi_zone.Zone as pickup_zone, taxi_zone_1.Zone as dropoff_zone, tpep_pickup_datetime, tpep_dropoff_datetime FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id JOIN taxi_zone as taxi_zone_1 ON trip_data.DOLocationID = taxi_zone_1.location_id WHERE tpep_dropoff_datetime > now() - interval '1 minute';

CREATE MATERIALIZED VIEW total_airport_pickups AS SELECT count(*) AS cnt, taxi_zone.Zone FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Zone LIKE '%Airport' GROUP BY taxi_zone.Zone;

CREATE MATERIALIZED VIEW airport_pu as SELECT tpep_pickup_datetime, pulocationid FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';

CREATE MATERIALIZED VIEW latest_jfk_pickup AS SELECT max(tpep_pickup_datetime) AS latest_pickup_time FROM trip_data JOIN taxi_zone ON trip_data.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';

CREATE MATERIALIZED VIEW jfk_pickups_1hr_before AS SELECT count(*) AS cnt FROM airport_pu JOIN latest_jfk_pickup ON airport_pu.tpep_pickup_datetime > latest_jfk_pickup.latest_pickup_time - interval '1 hour' JOIN taxi_zone ON airport_pu.PULocationID = taxi_zone.location_id WHERE taxi_zone.Borough = 'Queens' AND taxi_zone.Zone = 'JFK Airport';

CREATE MATERIALIZED VIEW busiest_zones_1_min AS SELECT taxi_zone.Zone AS dropoff_zone, count(*) AS last_1_min_dropoff_cnt FROM trip_data JOIN taxi_zone ON trip_data.DOLocationID = taxi_zone.location_id WHERE trip_data.tpep_dropoff_datetime > (NOW() - INTERVAL '1' MINUTE) GROUP BY taxi_zone.Zone ORDER BY last_1_min_dropoff_cnt DESC LIMIT 10;

CREATE MATERIALIZED VIEW longest_trip_1_min AS SELECT tpep_pickup_datetime, tpep_dropoff_datetime, taxi_zone_pu.Zone as pickup_zone, taxi_zone_do.Zone as dropoff_zone, trip_distance FROM trip_data JOIN taxi_zone as taxi_zone_pu ON trip_data.PULocationID = taxi_zone_pu.location_id JOIN taxi_zone as taxi_zone_do ON trip_data.DOLocationID = taxi_zone_do.location_id WHERE trip_data.tpep_pickup_datetime > (NOW() - INTERVAL '5' MINUTE) ORDER BY trip_distance DESC LIMIT 10;

CREATE MATERIALIZED VIEW avg_fare_amt AS SELECT avg(fare_amount) AS avg_fare_amount_per_min, count(*) AS num_rides_per_min, window_start, window_end FROM TUMBLE(trip_data, tpep_pickup_datetime, INTERVAL '1' MINUTE) GROUP BY window_start, window_end ORDER BY num_rides_per_min ASC;