/taskiq

Distributed task queue with full async support

Primary LanguagePythonMIT LicenseMIT

PyPI - Python Version PyPI PyPI - Downloads


Documentation: https://taskiq-python.github.io/

What is taskiq?

Taskiq is an asynchronous distributed task queue for python. This project takes inspiration from big projects such as Celery and Dramatiq. But taskiq can send and run both the sync and async functions, has integration with popular async frameworks, such as FastAPI and AioHTTP.

Also, we use PEP-612 to provide the best autosuggestions possible. All code is type-hinted.

Installation

This project can be installed using pip:

pip install taskiq

Or it can be installed directly from git:

pip install git+https://github.com/taskiq-python/taskiq

Usage

At first you need to create a broker. Broker is an object that can communicate to workers using distributed queues.

We have differet brokers for different queue backends. For example, we have a broker for NATS, Redis, RabbitMQ, Kafka and even more. Choose the one that fits you and create an instance.

from taskiq_nats import JetStreamBroker

broker = JetStreamBroker("nats://localhost:4222", queue="my_queue")

Declaring tasks is as easy as declaring a function. Just add a decorator to your function and you are ready to go.

import asyncio

from taskiq_nats import JetStreamBroker

broker = JetStreamBroker("nats://localhost:4222", queue="my_queue2")


@broker.task
async def my_task(a: int, b: int) -> None:
    print("AB", a + b)


async def main():
    await broker.startup()

    await my_task.kiq(1, 2)

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

The message is going to be sent to the broker and then to the worker. The worker will execute the function. To start worker processes, just run the following command:

taskiq worker path.to.the.module:broker

Where path.to.the.module is the path to the module where the broker is defined and broker is the name of the broker variable.

If you have tasks in different modules, you can ask taskiq to automatically import them by passing the --fs-discover flag:

taskiq worker path.to.the.module:broker --fs-discover

It will import all modules called tasks.py in the current directory and all subdirectories.

Also, we support hot reload for workers. To enable it, just pass the --reload flag. It will reload the worker when the code changes (To use it, install taskiq with reload extra. E.g pip install taskiq[reload]).

Also, we have cool integrations with popular async frameworks. For example, we have an integration with FastAPI or AioHTTP. You can use it to reuse dependencies from your web app in your tasks.

Read about all features in our documentation: https://taskiq-python.github.io/

Local development

Linting

We use pre-commit to do linting locally.

After cloning this project, please install pre-commit. It helps fix files before committing changes.

pre-commit install

Testing

Pytest can run without any additional actions or options.

pytest

Docs

To run docs locally, you need to install yarn.

First, you need to install dependencies.

yarn install

After that you can set up a docs server by running:

yarn docs:dev