/kstreams

Library/micro framework to create streaming applications with kafka

Primary LanguagePythonApache License 2.0Apache-2.0

Kstreams

kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.

Build status codecov python version


Documentation: https://kpn.github.io/kstreams/


Installation

pip install kstreams

Usage

import asyncio
from kstreams import create_engine, Stream


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
    async for cr in stream:
        print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await create_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")
        await asyncio.sleep(3)


async def main():
    await stream_engine.start()
    await produce()
    await stream_engine.stop()

if __name__ == "__main__":
    asyncio.run(main())

Features

  • Produce events
  • Consumer events with Streams
  • Prometheus metrics and custom monitoring
  • TestClient
  • Custom Serialization and Deserialization
  • Easy to integrate with any async framework. No tied to any library!!
  • Yield events from streams
  • Store (kafka streams pattern)
  • Stream Join
  • Windowing

Development

This repo requires the use of poetry instead of pip. Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true

To install the dependencies just execute:

poetry install

Then you can activate the virtualenv with

poetry shell

Run test:

./scripts/test

Run code formatting (black and isort)

./scripts/format

Commit messages

We use conventional commits for the commit message.

The use of commitizen is recommended. Commitizen is part of the dev dependencies.

cz commit