/kafkas

async kafka client in pure Rust

Primary LanguageRustMIT LicenseMIT

kafkas

Async kafka client in pure Rust.

Features

  • Multiple async runtime (tokio, async-std, etc.)
  • All versions of kafka are supported
  • Compression (gzip, snappy, lz4)

APIs

  • Producer
  • Consumer
  • Streams
  • Connect
  • Admin client

Usage

[dependencies]
kafkas = { git = "https://github.com/iamazy/kafkas", branch = "main" }

To get started using kafkas:

  • Producer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
    let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;

    let producer = Producer::new(client, ProducerOptions::default()).await?;

    let (mut tx, mut rx) = futures::channel::mpsc::unbounded();
    tokio::task::spawn(Box::pin(async move {
        while let Some(fut) = rx.next().await {
            if let Err(e) = fut.await {
                error!("{e}");
            }
        }
    }));

    let topic = topic_name("kafka");
    for _ in 0..10000_0000 {
        let record = TestData::new("hello kafka");
        let ret = producer.send(&topic, record).await?;
        let _ = tx.send(ret).await;
    }
}
  • Consumer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
    let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;

    let mut consumer_options = ConsumerOptions::new("default");
    consumer_options.auto_commit_enabled = false;
    
    let mut consumer = Consumer::new(kafka_client, consumer_options).await?;
    
    let consume_stream = consumer.subscribe::<&str, ConsumerRecord>(vec!["kafka"]).await?;
    pin_mut!(consume_stream);

    while let Some(records) = consume_stream.next().await {
        for record in records {
            if let Some(value) = record.value {
                println!("{:?} - {}", String::from_utf8(value.to_vec())?, record.offset);
            }
        }
        // needed only when `auto_commit_enabled` is false
        consumer.commit_async().await?;
    }
}

Examples

Examples can be found in examples.

Flame graph

flamegraph

Rust version requirements

The rust version used for kafkas development is 1.65.

Acknowledgments