Tansu is an Apache Kafka API compatible broker with a Postgres storage engine. Acting as a drop in replacement, existing clients connect to Tansu, producing and fetching messages stored in Postgres. Tansu is in early development, licensed under the GNU AGPL. Written in async 🦀 Rust 🚀.
While retaining API compatibility, the current storage engine implemented for Postgres is very different when compared to Apache Kafka:
- Messages are not stored in segments, so that retention and compaction polices can be applied immediately.
- Message ordering is total over all topics and not restricted to a single topic partition.
- Brokers do not replicate messages, relying on continous archiving instead.
Our initial use cases are relatively low volume Kafka deployments where total message ordering could be useful. Other non-functional requirements might require a different storage engine. Tansu has been designed to work with multiple storage engines which are also in development:
- A Postgres engine where message ordering is either per topic, or per topic partition (as in Kafka).
- An object store for S3 or compatible services.
- A segmented disk store (as in Kafka with broker replication).
We store a Kafka message using the following record
schema:
create table record (
id bigserial primary key not null,
topic uuid references topic(id),
partition integer,
producer_id bigint,
sequence integer,
timestamp timestamp,
k bytea,
v bytea,
last_updated timestamp default current_timestamp not null,
created_at timestamp default current_timestamp not null
);
The k
and v
are the key and value being stored by the client, with
the SQL being used for a fetch looks like:
with sized as (
select
record.id,
timestamp,
k,
v,
sum(coalesce(length(k), 0) + coalesce(length(v), 0)),
over (order by record.id) as bytes
from cluster, record, topic
where
cluster.name = $1
and topic.name = $2
and record.partition = $3
and record.id >= $4
and topic.cluster = cluster.id
and record.topic = topic.id
) select * from sized where bytes < $5;
One of the parameters for the Kafka Fetch API is the maximum number of bytes being returned. We use a with query here to restrict the size of the result set being returned, with a running total of the size.
Tansu is available as a minimal from scratch
docker image. With a compose.yaml
, available from here:
docker compose up
Using the regular Apache Kafka CLI you can create topics, produce and consume messages with Tansu:
kafka-topics \
--bootstrap-server localhost:9092 \
--partitions=3 \
--replication-factor=1 \
--create --topic test
Producer:
echo "hello world" | kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic test
Consumer:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning \
--property print.timestamp=true \
--property print.key=true \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.value=true
Or using librdkafka to produce:
echo "Lorem ipsum dolor..." | \
./examples/rdkafka_example -P \
-t test \
-b localhost:9092 \
-z gzip
Consumer:
./examples/rdkafka_example \
-C \
-t test \
-b localhost:9092