/dramatiq_kafka

a dramatiq broker that can be used with Apache Kafka

Primary LanguagePython

Uses kafka-python package to write a Kafka broker for dramatiq.

import dramatiq

from dramatiq.middleware import AgeLimit, TimeLimit, Callbacks, Pipelines, Prometheus, Retries
from dramatiq_kafka import KafkaBroker

broker = KafkaBroker(
    bootstrap_servers="localhost:9092",
    topic="testdramatiq_topic", # default is `default`
    group_id="testdramatiq_consumergroup", # optional, default is `default`
    middleware=[
        Prometheus(),
        AgeLimit(),
        TimeLimit(),
        Callbacks(),
        Pipelines(),
        Retries(min_backoff=1000, max_backoff=900000, max_retries=96),
    ],
)
dramatiq.set_broker(broker)

Use dramatiq to send a message to a Kafka topic (queue_name) as usual:

    message = Message(
        queue_name="rita",
        actor_name="email_customer",
        args=("testing@gmail.com",),
        kwargs={},
        options={},
    )

    broker = get_broker()
    broker.enqueue(message)

Write tasks as described in django_dramatiq's documentation (i.e. create dramatiq actors in the task.py files) and modify settings as shown in django_example to get started. To send messages, view the views.py file for an example