/aioevents

A simple library for managing events through an asynchronous queue

Primary LanguagePythonApache License 2.0Apache-2.0

aioevents

https://travis-ci.com/mpyatishev/aioevents.svg?branch=master

A simple library for managing events through an asynchronous queue

Installation

pip install aioevents-ng

Usage example

import asyncio

from dataclasses import dataclass

import aioevents


@dataclass
class MyEvent(aioevents.Event):
   payload: str


@aioevents.manager.register(MyEvent)
async def event_hadler(event: aioevents.Event):
   print(f"recieved: {event}")


async def produce():
   async with aioevents.events as events:
      await events.publish(MyEvent("Hello!"))


async def main():
   aioevents.start(asyncio.get_event_loop())

   await produce()

   print('stopping worker')
   aioevents.stop()

   # wait for all coroutines
   await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(main())

License

aioevents library is offered under Apache 2 license.