a2aproject/a2a-python

[Bug]: EventQueue cann't dequeue messages in time

fwwucn opened this issue · 4 comments

What happened?

I integrated a2a-python SDK with Smolagents framework with stream output. The agent outputs stream messages in real-time, and the messages are enqueued in TestAgentExecutor.execute. But dequeuing messages always start after finishing enqueuing all.

Here is my test.py code (Replace LLM id/key/base_url with yours):

import logging
import time
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TaskState,
    TaskStatus,
    TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_task
from fastapi import FastAPI
from smolagents import CodeAgent, OpenAIServerModel, WebSearchTool
from smolagents.memory import ActionStep, FinalAnswerStep
from smolagents.models import ChatMessageStreamDelta
from typing_extensions import override


logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class TestAgentExecutor(AgentExecutor):
    def __init__(self):
        model = OpenAIServerModel(
            model_id="{YOUR_MODEL_ID}",
            api_base="{YOUR_API_BASE}",
            api_key="{YOUR_API_KEY}")

        self.agent = CodeAgent(model=model,
            tools=[WebSearchTool()],
            stream_outputs=True)

    @override
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        query = context.get_user_input()
        task = context.current_task

        if not context.message:
            raise Exception('No message provided')

        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

        start = time.time()
        try:
            for message in self.agent.run(query, stream=True):
                if isinstance(message, ChatMessageStreamDelta):
                    text = message.content
                elif isinstance(message, ActionStep):
                    text = message.model_output
                elif isinstance(message, FinalAnswerStep):
                    text = message.output
                print(f"***{time.time() - start}: {text}")
                await self._send_working_message(context, event_queue, text)
        except Exception as e:
            logger.error(f"Error in streaming output: {str(e)}")
        finally:
            await self._send_final_answer(context, event_queue)

    async def _send_working_message(self, context, event_queue, text):
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                append=True,
                status=TaskStatus(
                    state=TaskState.working,
                    message=new_agent_text_message(
                        text,
                        context.context_id,
                        context.task_id,
                    ),
                ),
                final=False,
                contextId=context.context_id,
                taskId=context.task_id,
            )
        )

    async def _send_final_answer(self, context, event_queue):
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                status=TaskStatus(state=TaskState.completed),
                final=True,
                contextId=context.context_id,
                taskId=context.task_id,
            )
        )

    @override
    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise Exception("Cancel not supported")


if __name__ == "__main__":
    agent_url = f"/api/v1/users/1/a2a/"

    skill = AgentSkill(
        id="assistant_agent_skill",
        name="I can do everything for you",
        description="I can assist you with a wide range of tasks, from chatting to calling tools.",
        tags=["assistant", "chat"],
        examples=["What's the weather in Shanghai today?"],
    )

    agent_card = AgentCard(
        name="Assistant Agent",
        description="Your personal assistant agent",
        url="http://localhost:5000" + agent_url,
        version="1.0.0",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        capabilities=AgentCapabilities(streaming=True),
        skills=[skill],
    )

    request_handler = DefaultRequestHandler(
        agent_executor=TestAgentExecutor(),
        task_store=InMemoryTaskStore(),
    )

    server = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler,
    )

    app = FastAPI()
    app.mount(agent_url, server.build())

    import uvicorn

    config = uvicorn.Config(app, host="0.0.0.0", port=5000)
    server = uvicorn.Server(config)
    server.run()

Run the code : python test.py 2>&1 | tee -a test.log &
Test with cURL: curl --request POST \ --url http://localhost:5000/api/v1/users/1/a2a/ \ --data '{"id":"1234","jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"推荐一部科幻电影"}],"messageId":"user123"}}}'

Symptom:
Stream outputs (log messages starting with ***) start from 1.142s, and also enqueued.
But Dequeued event (waited) of type: <class 'a2a.types.Task'> starts very late (always after Closing EventQueue) from Line4802 in the log (part of the log attached below).

Relevant log output

***1.1423838138580322: Thought
DEBUG:a2a.utils.telemetry:Start async tracer
DEBUG:a2a.server.events.event_queue:Enqueuing event of type: <class 'a2a.types.TaskStatusUpdateEvent'>

...

DEBUG:a2a.utils.telemetry:Start async tracer
DEBUG:a2a.server.events.event_queue:Enqueuing event of type: <class 'a2a.types.TaskStatusUpdateEvent'>
DEBUG:a2a.utils.telemetry:Start async tracer
DEBUG:a2a.server.events.event_queue:Enqueuing event of type: <class 'a2a.types.TaskStatusUpdateEvent'>
DEBUG:a2a.utils.telemetry:Start async tracer
DEBUG:a2a.server.events.event_queue:Closing EventQueue.
DEBUG:a2a.server.events.event_queue:Dequeued event (waited) of type: <class 'a2a.types.Task'>
DEBUG:a2a.server.events.event_consumer:Dequeued event of type: <class 'a2a.types.Task'> in consume_all.
DEBUG:a2a.server.events.event_queue:Marking task as done in EventQueue.
DEBUG:a2a.server.events.event_consumer:Marked task as done in event queue in consume_all
DEBUG:a2a.server.tasks.task_manager:Processing save of task event of type Task for task_id: 2aee89ae-96eb-4294-be25-3b761d7bbf75
DEBUG:a2a.server.tasks.task_manager:Saving task with id: 2aee89ae-96eb-4294-be25-3b761d7bbf75
DEBUG:a2a.server.tasks.inmemory_task_store:Task 2aee89ae-96eb-4294-be25-3b761d7bbf75 saved successfully.

Code of Conduct

  • I agree to follow this project's Code of Conduct

After I monkey patched EventQueue.enqueue_event, it works like a charm.
I appended these 3 lines after self.queue.put:

    tasks = [asyncio.create_task(self.queue.join())]
    for child in self._children:
        tasks.append(asyncio.create_task(child.close()))
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

Finally it became:

async def enqueue_event(self, event: Event) -> None:
    """Enqueues an event to this queue and all its children.

    Args:
        event: The event object to enqueue.
    """
    async with self._lock:
        if self._is_closed:
            logger.warning('Queue is closed. Event will not be enqueued.')
            return

    logger.debug(f'Enqueuing event of type: {type(event)}')

    # Make sure to use put instead of put_nowait to avoid blocking the event loop.
    await self.queue.put(event)
    tasks = [asyncio.create_task(self.queue.join())]
    for child in self._children:
        await child.enqueue_event(event)
        tasks.append(asyncio.create_task(child.close()))
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

How about the fix?

It looks like self.agent.run from your above example runs synchronously and blocks the event loop from execution. Can you check if there is a async version for this run or move the run to a background thread?

https://huggingface.co/docs/smolagents/main/examples/async_agent talks about this behavior and has suggestions on how to make the agent async.

This issue can be observed in the "langgraph" sample.

In test_client.py, retain only the streaming section, then run both the server and the client.
The issue becomes clearly observable.
All events are received on the client side as a batch after the server has pushed them.
Even use the latest a2a-sdk 0.3.3.

===
The patch below resolves the issue.
Thank you, @fwwucn .

After I monkey patched EventQueue.enqueue_event, it works like a charm. I appended these 3 lines after self.queue.put:

    tasks = [asyncio.create_task(self.queue.join())]
    for child in self._children:
        tasks.append(asyncio.create_task(child.close()))
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

Finally it became:

async def enqueue_event(self, event: Event) -> None:
    """Enqueues an event to this queue and all its children.

    Args:
        event: The event object to enqueue.
    """
    async with self._lock:
        if self._is_closed:
            logger.warning('Queue is closed. Event will not be enqueued.')
            return

    logger.debug(f'Enqueuing event of type: {type(event)}')

    # Make sure to use put instead of put_nowait to avoid blocking the event loop.
    await self.queue.put(event)
    tasks = [asyncio.create_task(self.queue.join())]
    for child in self._children:
        await child.enqueue_event(event)
        tasks.append(asyncio.create_task(child.close()))
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

How about the fix?