Experiment with:
Install rpk CLI to use as kafka CLI
brew install redpanda-data/tap/redpanda
# add zsh completions
rpk generate shell-completion zsh > "${fpath[1]}/_rpk"
Create rpk profile to connect to local redpanda kafka cluster
rpk profile create local \
-s brokers=localhost:19092 \
-s registry.hosts=localhost:8081 \
-s admin.hosts=localhost:9644
Install psql CLI for Mac
brew install libpq
# Finally, symlink psql (and other libpq tools) into /usr/local/bin:
brew link --force libpq
# to connect to local database
psql "postgresql://postgres:postgres@localhost/postgres?sslmode=require"
First time setup
# pull docker images to local
docker compose --profile optional pull
docker compose up
# docker compose --profile optional up
docker compose ps
open http://localhost:5115/ # Arroyo Console
open http://localhost:8080/ # Redpanda Console
open http://localhost:8081/subjects # Redpanda Registry
docker compose down
# (DANGER) - shutdown and delete volumes
docker compose down -v
Benthos example
# to start with benthos
docker compose up connect
docker compose down
# (DANGER) - shutdown and delete volumes
docker compose down -v
This will start:
- Postgres Database
- Kafka - Redpanda or Bufstream
- Redpanda Console
- Redpanda Connect (optional)
- MinIO (optional)
- ClickHouse (optional)
- Arroyo
Add a new topics
Tip
You can also use Redpanda Console to create topics.
rpk topic list
rpk topic create -r 1 -p 1 customer-source
rpk topic create -r 1 -p 1 customer-sink
in from Arroyo Console, Create a pipeline with:
Warning
By default preview doesn't write to sinks to avoid accidentally writing bad data. You can run the pipeline for real by clicking "Launch" or you can enable web sinks in preview:
CREATE TABLE customer_source (
name TEXT,
age INT,
phone TEXT
) WITH (
connector = 'kafka',
format = 'json',
type = 'source',
bootstrap_servers = 'redpanda:9092',
topic = 'customer-source'
);
CREATE TABLE customer_sink (
count BIGINT,
age INT
) WITH (
connector = 'kafka',
format = 'json',
type = 'sink',
bootstrap_servers = 'redpanda:9092',
topic = 'customer-sink'
);
SELECT count(*), age
FROM customer_source
GROUP BY age, hop(interval '2 seconds', interval '10 seconds');
INSERT INTO customer_sink SELECT count(*), age
FROM customer_source
GROUP BY age, hop(interval '2 seconds', interval '10 seconds');
publish a couple of messages to customer-source
topic using Redpanda Console e.g:
Important
Use TYPE: JSON
{
"name": "sumo",
"age": 70,
"phone": "111-222-4444"
}
Check any new messages in customer-sink
topic.
Try more examples in Arroyo Console: http://localhost:5115/
basic_tumble_aggregate
CREATE TABLE nexmark WITH (
connector = 'nexmark',
event_rate = 10
);
SELECT
bid.auction as auction,
tumble(INTERVAL '1' second) as window,
count(*) as count
FROM
nexmark
where
bid is not null
GROUP BY
1,
2
bitcoin_exchange_rate
CREATE TABLE coinbase (
type TEXT,
price TEXT
) WITH (
connector = 'websocket',
endpoint = 'wss://ws-feed.exchange.coinbase.com',
subscription_message = '{
"type": "subscribe",
"product_ids": [
"BTC-USD"
],
"channels": ["ticker"]
}',
format = 'json'
);
SELECT avg(CAST(price as FLOAT)) from coinbase
WHERE type = 'ticker'
GROUP BY hop(interval '5' second, interval '1 minute');
first_pipeline
CREATE TABLE nexmark with (
connector = 'nexmark',
event_rate = '100'
);
-- SELECT * from nexmark where auction is not null;
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY window
ORDER BY count DESC) AS row_num
FROM (SELECT count(*) AS count, bid.auction AS auction,
hop(interval '2 seconds', interval '60 seconds') AS window
FROM nexmark WHERE bid is not null
GROUP BY 2, window)) WHERE row_num <= 5;
create_table_updating
CREATE TABLE nexmark with (
connector = 'nexmark',
event_rate = '100'
);
CREATE TABLE bids (
auction BIGINT,
bidder BIGINT,
channel VARCHAR,
url VARCHAR,
datetime DATETIME,
avg_price BIGINT
) WITH (
connector = 'filesystem',
type = 'sink',
path = '/home/data',
format = 'parquet',
parquet_compression = 'zstd',
rollover_seconds = 60,
time_partition_pattern = '%Y/%m/%d/%H',
partition_fields = 'bidder'
);
-- SELECT bid from nexmark where bid is not null;
INSERT INTO bids
SELECT
bid.auction, bid.bidder, bid.channel, bid.url, bid.datetime, bid.price as avg_price
FROM
nexmark
where
bid is not null
CREATE TABLE nexmark with (
connector = 'nexmark',
event_rate = '100'
);
SELECT avg(bid.price) as avg_price
FROM nexmark
WHERE bid IS NOT NULL
GROUP BY hop(interval '2 seconds', interval '10 seconds');
- Try Redpanda Iceberg Topics for SQL-based analytics with zero ETL
- Build a Streaming CDC Pipeline with MinIO and Redpanda into Snowflake
- SQLFlow - Enables SQL-based stream-processing, powered by DuckDB.