/rhubarb

An Event Bus library that simplifies interacting with multiple queue backends into a single API

Primary LanguagePythonMIT LicenseMIT

Rhubarb

Release Build Python Version Dependencies Status codecov Documentation Status Code style: black Security: bandit Pre-commit Semantic Versions Commit activity License

Rhubarb is a library that simplifies realtime streaming of events for a number of backends in to a single API. Currently supports Postgres, kafka, RabbitMQ, redis as well as an internal memory backend useful for testing.

Installation

There are a number of backends that can be used with Rhubarb:

Kafka Postgres Redis RabbitMQ

pip install rhubarb-py[kafka] pip install rhubarb-py[postgres] pip install rhubarb-py[redis] pip install rhubarb-py[rabbitmq]

Backends

  • Rhubarb("redis://localhost:6379/0")
  • Rhubarb("kafka://localhost:9092")
  • Rhubarb("postgres://postgres:postgres@localhost:5432/rhubarb")
  • Rhubarb("amqp://guest:guest@localhost/")
  • Rhubarb("memory://")

Quick start

Simple event consumer

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

Simple event producer

async with Rhubarb("redis://localhost:6379/0") as events:
    await events.publish("test message")

History retrieval

async with Rhubarb("redis://localhost:6379/0") as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Custom serializer & deserializer

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(channel="CHATROOM", history=10) as subscriber: # read the last 10 events published to the channel
        async for event in subscriber:
            await websocket.send_text(event.message)

Group subscribing (at-most-once processing)

async with Rhubarb("redis://localhost:6379/0", serializer=json.dumps, deserializer=json.loads) as events:
    async with events.subscribe(
        "TEST-GROUP-CHANNEL", group_name="TEST_GROUP", consumer_name="sub_1"
    ) as subscriber_1:
        async for event in subscriber:
            await process_job(event)

Example

A minimal working example can be found in example directory.

import os

from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute

from rhubarb import Rhubarb

URL = os.environ.get("URL", "redis://localhost:6379/0")

events = Rhubarb(URL)

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""


async def homepage(_):
    return HTMLResponse(html)


async def room_consumer(websocket):
    async for message in websocket.iter_text():
        await events.publish(channel="chatroom", message=message)


async def room_producer(websocket):
    async with events.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


async def ws(websocket):
    await websocket.accept()
    await run_until_first_complete(
        (room_consumer, {"websocket": websocket}),
        (room_producer, {"websocket": websocket}),
    )


routes = [
    Route("/", homepage),
    WebSocketRoute("/ws", ws, name="chatroom_ws"),
]


app = Starlette(
    routes=routes,
    on_startup=[events.connect],
    on_shutdown=[events.disconnect],
)

🛡 License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.