/tansu

Tansu is an Apache Kafka API compatible broker written in async 🚀 Rust 🦀

Primary LanguageRustGNU Affero General Public License v3.0AGPL-3.0

Tansu

Tansu is an Apache Kafka API compatible broker with a Postgres storage engine. Acting as a drop in replacement, existing clients connect to Tansu, producing and fetching messages stored in Postgres. Tansu is in early development, licensed under the GNU AGPL. Written in async 🦀 Rust 🚀.

While retaining API compatibility, the current storage engine implemented for Postgres is very different when compared to Apache Kafka:

  • Messages are not stored in segments, so that retention and compaction polices can be applied immediately.
  • Message ordering is total over all topics and not restricted to a single topic partition.
  • Brokers do not replicate messages, relying on continous archiving instead.

Our initial use cases are relatively low volume Kafka deployments where total message ordering could be useful. Other non-functional requirements might require a different storage engine. Tansu has been designed to work with multiple storage engines which are also in development:

  • A Postgres engine where message ordering is either per topic, or per topic partition (as in Kafka).
  • An object store for S3 or compatible services.
  • A segmented disk store (as in Kafka with broker replication).

We store a Kafka message using the following record schema:

create table record (
  id bigserial primary key not null,
  topic uuid references topic(id),
  partition integer,
  producer_id bigint,
  sequence integer,
  timestamp timestamp,
  k bytea,
  v bytea,
  last_updated timestamp default current_timestamp not null,
  created_at timestamp default current_timestamp not null
);

The k and v are the key and value being stored by the client, with the SQL being used for a fetch looks like:

with sized as (
 select
 record.id,
 timestamp,
 k,
 v,
 sum(coalesce(length(k), 0) + coalesce(length(v), 0)),
 over (order by record.id) as bytes
 from cluster, record, topic
 where
 cluster.name = $1
 and topic.name = $2
 and record.partition = $3
 and record.id >= $4
 and topic.cluster = cluster.id
 and record.topic = topic.id
) select * from sized where bytes < $5;

One of the parameters for the Kafka Fetch API is the maximum number of bytes being returned. We use a with query here to restrict the size of the result set being returned, with a running total of the size.

Tansu is available as a minimal from scratch docker image. With a compose.yaml, available from here:

docker compose up

Using the regular Apache Kafka CLI you can create topics, produce and consume messages with Tansu:

kafka-topics \
  --bootstrap-server localhost:9092 \
  --partitions=3 \
  --replication-factor=1 \
  --create --topic test

Producer:

echo "hello world" | kafka-console-producer \
    --bootstrap-server localhost:9092 \
    --topic test

Consumer:

kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic test \
  --from-beginning \
  --property print.timestamp=true \
  --property print.key=true \
  --property print.offset=true \
  --property print.partition=true \
  --property print.headers=true \
  --property print.value=true

Or using librdkafka to produce:

echo "Lorem ipsum dolor..." | \
  ./examples/rdkafka_example -P \
  -t test \
  -b localhost:9092 \
  -z gzip

Consumer:

./examples/rdkafka_example \
  -C \
  -t test \
  -b localhost:9092