faust-streaming/faust

Concurrency issue with topics

kpister opened this issue · 4 comments

I have a task listening on a topic and I send messages to the topic from the web endpoint for the app.
The task is flagged with concurrency=2 but cannot handle multiple records at the same time.

Below I have provided an example app (run with faust -A example_app worker -l INFO -p 5001) which you can hit a /count/ endpoint to get a counter. For example purposes, I have put a time.sleep(5) in the task handler, but this could be any slow operation.

Workflow:

Hit the count endpoint twice, in quick succession.

`curl -X "POST" "http://localhost:5001/count/"

`curl -X "POST" "http://localhost:5001/count/"

Expected outcome:

Logs should be like

Endpoint hit
Added message queue
Example Task: {'message': 'example', 'count': 2}
Endpoint hit
Added message queue
Done with Example Task:  {'message': 'example', 'count': 2}
Example Task: {'message': 'example', 'count': 3}
Done with Example Task:  {'message': 'example', 'count': 3}

Actual output

Notice the time stamps, despite curling immediately.

[2023-02-28 13:54:41,620] [30988] [INFO] Endpoint hit
[2023-02-28 13:54:41,620] [30988] [INFO] Added message queue
[2023-02-28 13:54:41,666] [30988] [INFO] Example Task: {'message': 'example', 'count': 2}
[2023-02-28 13:54:46,678] [30988] [INFO] Endpoint hit
[2023-02-28 13:54:46,680] [30988] [INFO] Added message queue
[2023-02-28 13:54:46,764] [30988] [INFO] Example Task: {'message': 'example', 'count': 3}

Versions

  • Python version 3.10.9
  • Faust version 0.9.5 and 0.10.4
  • Operating system Windows
  • Kafka version Unknown

Code

import logging
import time
import faust
from aiokafka.helpers import create_ssl_context
from faust.sensors.datadog import Monitor
from faust.web import Request, Response, View
from faust.types.auth import AuthProtocol

logger = logging.getLogger(__name__)
broker_credentials = faust.SASLCredentials(...)
broker_credentials.protocol = AuthProtocol.SASL_SSL

app = faust.App(
    "streaming",
    version=1,
    broker="...",
    value_serializer="json",
    broker_credentials=broker_credentials,
    partition_assignment_strategy="CooperativeStickyAssignor",
    monitor=Monitor(),
)


TOPIC_NAME = "example"
example_topic = app.topic(TOPIC_NAME, value_type=str, partitions=None)

@app.agent(example_topic, concurrency=2)
async def example_task(records):
    async for key, record in records.items():
        logger.info(f"Example Task: {record}")

        # Network IO
        time.sleep(5)
        logger.info(f"Done with Example Task: {record}")


@app.page("/count/")
class CountPage(View):
    count: int = 0

    async def post(self, request: Request) -> Response:
        self.count += 1
        msg = {"message": "example", "count": self.count }

        logger.info(f"Endpoint hit")
        await example_topic.send(key=TOPIC_NAME, value=msg)
        logger.info(f"Added message queue")

        return self.json({"count": self.count})


if __name__ == "__main__":
    app.main()

I think you are blocking event loop with sync sleep

I see what you mean, if we use await asyncio.sleep(5) instead of time.sleep(5) then the results seem to have the right ordering.

When time.sleep(5) is called, it will block the entire execution of the script and it will be put on hold, just frozen, doing nothing. But when you call await asyncio.sleep(5), it will ask the event loop to run something else while your await statement finishes its execution.

Okay, so for solving my problem, I'll need to transition the network call to use an async call instead of whatever it is currently doing. This is very helpful, thank you!