Send and receive messages by using SQS queues.
pip install comlink
poetry add comlink
from comlink import SqsConsumer, SqsQueue, signal_event
async def example(queue_url, sqs_client):
# Create a queue object
sqs_queue = SqsQueue(url=queue_url, client=sqs_client)
# Event for stopping the consumer by receiving os signal.
# stop_event waits for SIGINT(Ctrl+C) or SIGTERM by default
stop_event = signal_event()
# Create a consumer with a handler that just prints the message
consumer = SqsConsumer(queue=sqs_queue, handler=print)
# Start the consumer
consumer_task = await consumer.start(stop_event=stop_event)
# Send a message to the queue
await sqs_queue.put("Hello, world!")
# Wait for the consumer to stop by receiving os signal
await consumer_task
More examples can be found in the examples directory.
- coroutine put(data, **kwargs)
- coroutine take(max_messages, visibility_timeout, wait_time_seconds, **kwargs)
- remove(receipt_handle, **kwargs)
It is a wrapper around aiobotocore's SQS client. It provides a simple interface for sending and receiving messages.
In most cases, you will need to pass a instance of this class to the comlink.SqsConsumer
and send messages to it using the put
method.
take
and remove
methods are used by the SqsConsumer
class. But you can use them if you need to.
Parameters:
url: str
- URL of the queueclient: Any
- aiobotocore's SQS clientserializer: Callable[[Message], str] | None
- a function that serializes the message to stringdeserializer: Callable[[str], Message] | None
- a function that deserializes the message from string
import json
from comlink import SqsQueue
async def example(queue_url, sqs_client):
sqs_queue = SqsQueue(url=queue_url, client=sqs_client, serializer=json.dumps, deserializer=json.loads)
await sqs_queue.put({"hello": "world"})
message = await sqs_queue.take(max_messages=1, visibility_timeout=10, wait_time_seconds=0)
print(message[0]["Body"])
await sqs_queue.remove(message[0]["ReceiptHandle"])
Sends a message to the queue.
Parameters:
data: Message
- message data. Ifserializer
is provided, it will be called withdata
as an argument.kwargs
- additional arguments for aiobotocore'ssend_message
method. E.g.DelaySeconds
,MessageGroupId
,MessageDeduplicationId
and etc.
Returns result of aiobotocore's send_message
method.
Get messages from the queue.
Parameters:
max_messages: int
- maximum number of messages to receivevisibility_timeout: int
- the duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by atake
requestwait_time_seconds: int
- the duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner thanwait_time_seconds
. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.kwargs
- additional arguments for aiobotocore'sreceive_message
method. E.g.AttributeNames
,MessageAttributeNames
and etc.
Returns list of messages that were received from the queue.
Deletes the specified message from the queue.
Parameters:
receipt_handle: str
- the receipt handle associated with the message to delete (fromtake
method)
Returns result of aiobotocore's delete_message
method.
It is a consumer that receives messages from the queue and passes them to the handler.
Init function only creates a consumer object. To start the consumer, you need to call the start
method.
Parameters:
queue: SqsQueue
- queue objecthandler: Callable
- a function that will be called with a message as an argument. It may be a regular function, a coroutine or an class instance with__call__
method.batch_size: int
- maximum number of messages to receive from the queue at oncevisibility_timeout: int
- the duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by acomlink.SqsQueue.take
requestwait_time_seconds: int
- the duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner thanwait_time_seconds
.
from comlink import SqsQueue, SqsConsumer, signal_event
async def handler(message: str):
print("Received message:", message)
async def example(sqs_queue: SqsQueue):
stop_event = signal_event()
consumer = SqsConsumer(queue=sqs_queue, handler=handler)
consumer_task = await consumer.start(stop_event=stop_event)
await consumer_task
Starts the consumer.
Parameters:
stop_event: asyncio.Event
- a consumer will stop when this event is set. You can usecomlink.signal_event
function to create an event that will be set when the process receives a signal.
Returns a consumer task. You can use it to wait for the consumer to stop. If the consumer is stopped by an exception, the task will raise an exception. If the consumer is stopped by the stop_event
, the task will return None
.
Creates an event that will be set when the process receives a signal.
Parameters:
signals: dict[int, str] | None
- a dictionary that maps signal numbers to signal names. IfNone
, the default dictionary will be used.log_level: int
- a log level that will be used to log signals. Default islogging.WARNING
.loop: asyncio.AbstractEventLoop | None
- an event loop. IfNone
, the current event loop will be used.
Returns an asyncio.Event object.
import logging
import signal
from comlink import signal_event
async def example():
stop_event = signal_event(
signals={
signal.SIGUSR1: "SIGUSR1",
signal.SIGINT: "SIGINT",
signal.SIGTERM: "SIGTERM",
},
log_level=logging.DEBUG,
)
await stop_event.wait()
print("Received a signal")
- Install Poetry.
- Install dependencies with
poetry install
. - Install Docker.
- Run
docker compose -f docker-compose.dev.yml up -d
to start the development environment (localstack). Tests will fail until the environment is up and running.
Run poetry run pytest
to run the tests.
Run poetry run black .
to format the code.
Run poetry run isort .
to sort the imports.