/streaming-playground

Testing arroyo with bufstream

Primary LanguageJupyter Notebook

Streaming Adventures

Experiment with:

Prerequisites

Install rpk CLI to use as kafka CLI

brew install redpanda-data/tap/redpanda
# add zsh completions
rpk generate shell-completion zsh > "${fpath[1]}/_rpk"

Create rpk profile to connect to local redpanda kafka cluster

rpk profile create local \
-s brokers=localhost:19092 \
-s registry.hosts=localhost:8081 \
-s admin.hosts=localhost:9644

Install psql CLI for Mac

brew install libpq
# Finally, symlink psql (and other libpq tools) into /usr/local/bin:
brew link --force libpq
# to connect to local database
psql "postgresql://postgres:postgres@localhost/postgres?sslmode=require"

Start

First time setup

# pull docker images to local
docker compose --profile optional pull
docker compose up
# docker compose --profile optional up
docker compose ps
open http://localhost:5115/ # Arroyo Console
open http://localhost:8080/ # Redpanda Console
open http://localhost:8081/subjects # Redpanda Registry
docker compose down
# (DANGER) - shutdown and delete volumes
docker compose down -v

Benthos example

# to start with benthos
docker compose up connect
docker compose down
# (DANGER) - shutdown and delete volumes
docker compose down -v

This will start:

  1. Postgres Database
  2. Kafka - Redpanda or Bufstream
  3. Redpanda Console
  4. Redpanda Connect (optional)
  5. MinIO (optional)
  6. ClickHouse (optional)
  7. Arroyo

Config

Kafka

Add a new topics

Tip

You can also use Redpanda Console to create topics.

rpk topic list
rpk topic create -r 1 -p 1 customer-source
rpk topic create -r 1 -p 1 customer-sink

Arroyo Pipeline

in from Arroyo Console, Create a pipeline with:

Warning

By default preview doesn't write to sinks to avoid accidentally writing bad data. You can run the pipeline for real by clicking "Launch" or you can enable web sinks in preview:

CREATE TABLE customer_source (
    name TEXT,
    age INT,
    phone TEXT
) WITH (
    connector = 'kafka',
    format = 'json',
    type = 'source',
    bootstrap_servers = 'redpanda:9092',
    topic = 'customer-source'
);

CREATE TABLE customer_sink (
    count BIGINT,
    age INT
) WITH (
    connector = 'kafka',
    format = 'json',
    type = 'sink',
    bootstrap_servers = 'redpanda:9092',
    topic = 'customer-sink'
);

SELECT count(*),  age
FROM customer_source
GROUP BY age, hop(interval '2 seconds', interval '10 seconds');

INSERT INTO customer_sink SELECT count(*),  age
FROM customer_source
GROUP BY age, hop(interval '2 seconds', interval '10 seconds');

Test

publish a couple of messages to customer-source topic using Redpanda Console e.g:

Important

Use TYPE: JSON

{
    "name": "sumo",
    "age": 70,
    "phone": "111-222-4444"
}

Check any new messages in customer-sink topic.

Examples

Try more examples in Arroyo Console: http://localhost:5115/

basic_tumble_aggregate

CREATE TABLE nexmark WITH (
    connector = 'nexmark',
    event_rate = 10
);

SELECT
    bid.auction as auction,
    tumble(INTERVAL '1' second) as window,
    count(*) as count
FROM
    nexmark
where
    bid is not null
GROUP BY
    1,
    2

bitcoin_exchange_rate

CREATE TABLE coinbase (
  type TEXT,
  price TEXT
) WITH (
  connector = 'websocket',
  endpoint = 'wss://ws-feed.exchange.coinbase.com',
  subscription_message = '{
      "type": "subscribe",
      "product_ids": [
        "BTC-USD"
      ],
      "channels": ["ticker"]
    }',
      format = 'json'
);

SELECT avg(CAST(price as FLOAT)) from coinbase
WHERE type = 'ticker'
GROUP BY hop(interval '5' second, interval '1 minute');

first_pipeline

CREATE TABLE nexmark with (
    connector = 'nexmark',
    event_rate = '100'
);

 

-- SELECT * from nexmark where auction is not null;

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) AS row_num
    FROM (SELECT count(*) AS count, bid.auction AS auction,
        hop(interval '2 seconds', interval '60 seconds') AS window
            FROM nexmark WHERE bid is not null
            GROUP BY 2, window)) WHERE row_num <= 5;

create_table_updating

CREATE TABLE nexmark with (
    connector = 'nexmark',
    event_rate = '100'
);

CREATE TABLE bids (
    auction    BIGINT,
    bidder     BIGINT,
    channel    VARCHAR,
    url        VARCHAR,
    datetime   DATETIME,
    avg_price  BIGINT
) WITH (
    connector = 'filesystem',
    type = 'sink',
    path = '/home/data',
    format = 'parquet',
    parquet_compression = 'zstd',
    rollover_seconds = 60,
    time_partition_pattern = '%Y/%m/%d/%H',
    partition_fields = 'bidder'
);

-- SELECT bid from nexmark where bid is not null;

INSERT INTO bids
SELECT
    bid.auction, bid.bidder, bid.channel, bid.url, bid.datetime, bid.price as avg_price
FROM
    nexmark
where
    bid is not null
CREATE TABLE nexmark with (
    connector = 'nexmark',
    event_rate = '100'
);

SELECT avg(bid.price) as avg_price
FROM nexmark
WHERE bid IS NOT NULL
GROUP BY hop(interval '2 seconds', interval '10 seconds');

TODO