a2aproject/a2a-python

[Bug]: Streaming endpoint `/a2a/person-agent/v1/message:stream` hangs indefinitely without explicit request.body() consumption

youngchannelforyou opened this issue ยท 0 comments

Description

When sending requests to the streaming endpoint /a2a/person-agent/v1/message:stream, the code hangs indefinitely at body = await request.body() in rest_handler.py. However, the issue is resolved when explicitly consuming the request body by uncommenting await request.body() in rest_adapter.py.

Environment

  • a2a-sdk version: 3.2
  • Python version: 3.11
  • Platform: MacOS

Test Code

import asyncio
import json
import logging
from uuid import uuid4

import httpx

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


async def test_a2a_json(
    base_url: str = "http://localhost:3300",
    agent_path: str = "/a2a/person-agent/v1/message:stream",
    timeout_seconds: float = 30.0,
) -> bool:
    full_url = f"{base_url}{agent_path}"
    logger.info(f"๐Ÿš€ A2A JSON test start: {full_url}")

    user_message = {
        "messageId": uuid4().hex,
        "role": 1,
        "content": [{"text": "Tell me in detail about Yuna Kim's major achievements"}],
        "metadata": {},
    }

    request_data = {
        "message": user_message,
        "configuration": {},
    }
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "X-Request-ID": uuid4().hex,
    }

    try:
        async with httpx.AsyncClient() as http_client:
            response = await http_client.post(
                full_url,
                json=request_data,
                headers=headers,
                timeout=timeout_seconds,
            )
            response.raise_for_status()
            try:
                result = response.json()
                logger.info(
                    f"๐Ÿ“ Response:\n{json.dumps(result, ensure_ascii=False, indent=2)}"
                )
            except json.JSONDecodeError as e:
                logger.error(f"โŒ JSON parse failed: {e}")
                logger.info(f"๐Ÿ“„ Raw response: {response.text}")
                return False
            logger.info("โœ… Request succeeded")
            return True
    except Exception as e:
        logger.error(f"โŒ Request error: {type(e).__name__}: {e}")
        return False


async def main():
    logger.info("=" * 80)
    logger.info("A2A Protocol v1 PersonAgent async JSON test start")
    logger.info("=" * 80)
    success = await test_a2a_json()
    if success:
        logger.info("๐ŸŽ‰ Test succeeded")
    else:
        logger.error("๐Ÿ’ฅ Test failed")


if __name__ == "__main__":
    asyncio.run(main())

Issue Details

  1. When using the endpoint /a2a/person-agent/v1/message:send, everything works normally.
  2. When using /a2a/person-agent/v1/message:stream, the code hangs indefinitely.
  3. The issue is resolved by uncommenting await request.body() in rest_adapter.py:
@rest_stream_error_handler
async def _handle_streaming_request(
    self,
    method: Callable[[Request, ServerCallContext], AsyncIterable[Any]],
    request: Request,
) -> EventSourceResponse:
    print("11111 @@@@@@@@@@@@@@@@@")
    # await request.body()  # Uncommenting this line resolves the issue
    print("11111 @@@@@@@@@@@@@@@@@")
    call_context = self._context_builder.build(request)
    print("22222 @@@@@@@@@@@@@@@@@")
    print(f"์ปจํ…์ŠคํŠธ ์ •๋ณด: {call_context}")
    print("22222 @@@@@@@@@@@@@@@@@")

    async def event_generator(
        stream: AsyncIterable[Any],
    ) -> AsyncIterator[dict[str, dict[str, Any]]]:
        print("33333 @@@@@@@@@@@@@@@@@")
        print(stream)
        print("33333 @@@@@@@@@@@@@@@@@")
        async for item in stream:
            print("44444 @@@@@@@@@@@@@@@@@")
            print(f"์‘๋‹ต ์•„์ดํ…œ: {item}")
            print("44444 @@@@@@@@@@@@@@@@@")
            yield {'data': item}

    return EventSourceResponse(
        event_generator(method(request, call_context))
    )
  1. The code hangs at body = await request.body() in rest_handler.py:
async def on_message_send_stream(
    self,
    request: Request,
    context: ServerCallContext,
) -> AsyncIterator[str]:
    print(1)
    body = await request.body()  # Code hangs here
    print(2)
    params = a2a_pb2.SendMessageRequest()
    print(3)
    # ...rest of the code

Root Cause Analysis

This is an event loop blocking issue in the asynchronous flow:

  1. Request Body Consumption:

    • In Starlette/FastAPI, request.body() can only be consumed once.
    • When consumed, the body is cached for subsequent calls.
    • If not consumed in advance, the first attempt to consume it happens inside rest_handler.py
  2. Asynchronous Flow Interruption:

    • When the streaming response has already begun with EventSourceResponse
    • Then trying to consume the request body inside on_message_send_stream
    • This causes the event loop to deadlock since the streaming response context is already active
  3. Solution Mechanism:

    • Pre-consuming the body in rest_adapter.py ensures it's cached
    • When rest_handler.py later calls request.body(), it gets the cached version without blocking