[Bug]: Streaming endpoint `/a2a/person-agent/v1/message:stream` hangs indefinitely without explicit request.body() consumption
youngchannelforyou opened this issue ยท 0 comments
youngchannelforyou commented
Description
When sending requests to the streaming endpoint /a2a/person-agent/v1/message:stream, the code hangs indefinitely at body = await request.body() in rest_handler.py. However, the issue is resolved when explicitly consuming the request body by uncommenting await request.body() in rest_adapter.py.
Environment
- a2a-sdk version: 3.2
- Python version: 3.11
- Platform: MacOS
Test Code
import asyncio
import json
import logging
from uuid import uuid4
import httpx
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
async def test_a2a_json(
base_url: str = "http://localhost:3300",
agent_path: str = "/a2a/person-agent/v1/message:stream",
timeout_seconds: float = 30.0,
) -> bool:
full_url = f"{base_url}{agent_path}"
logger.info(f"๐ A2A JSON test start: {full_url}")
user_message = {
"messageId": uuid4().hex,
"role": 1,
"content": [{"text": "Tell me in detail about Yuna Kim's major achievements"}],
"metadata": {},
}
request_data = {
"message": user_message,
"configuration": {},
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-Request-ID": uuid4().hex,
}
try:
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
full_url,
json=request_data,
headers=headers,
timeout=timeout_seconds,
)
response.raise_for_status()
try:
result = response.json()
logger.info(
f"๐ Response:\n{json.dumps(result, ensure_ascii=False, indent=2)}"
)
except json.JSONDecodeError as e:
logger.error(f"โ JSON parse failed: {e}")
logger.info(f"๐ Raw response: {response.text}")
return False
logger.info("โ
Request succeeded")
return True
except Exception as e:
logger.error(f"โ Request error: {type(e).__name__}: {e}")
return False
async def main():
logger.info("=" * 80)
logger.info("A2A Protocol v1 PersonAgent async JSON test start")
logger.info("=" * 80)
success = await test_a2a_json()
if success:
logger.info("๐ Test succeeded")
else:
logger.error("๐ฅ Test failed")
if __name__ == "__main__":
asyncio.run(main())Issue Details
- When using the endpoint
/a2a/person-agent/v1/message:send, everything works normally. - When using
/a2a/person-agent/v1/message:stream, the code hangs indefinitely. - The issue is resolved by uncommenting
await request.body()inrest_adapter.py:
@rest_stream_error_handler
async def _handle_streaming_request(
self,
method: Callable[[Request, ServerCallContext], AsyncIterable[Any]],
request: Request,
) -> EventSourceResponse:
print("11111 @@@@@@@@@@@@@@@@@")
# await request.body() # Uncommenting this line resolves the issue
print("11111 @@@@@@@@@@@@@@@@@")
call_context = self._context_builder.build(request)
print("22222 @@@@@@@@@@@@@@@@@")
print(f"์ปจํ
์คํธ ์ ๋ณด: {call_context}")
print("22222 @@@@@@@@@@@@@@@@@")
async def event_generator(
stream: AsyncIterable[Any],
) -> AsyncIterator[dict[str, dict[str, Any]]]:
print("33333 @@@@@@@@@@@@@@@@@")
print(stream)
print("33333 @@@@@@@@@@@@@@@@@")
async for item in stream:
print("44444 @@@@@@@@@@@@@@@@@")
print(f"์๋ต ์์ดํ
: {item}")
print("44444 @@@@@@@@@@@@@@@@@")
yield {'data': item}
return EventSourceResponse(
event_generator(method(request, call_context))
)- The code hangs at
body = await request.body()inrest_handler.py:
async def on_message_send_stream(
self,
request: Request,
context: ServerCallContext,
) -> AsyncIterator[str]:
print(1)
body = await request.body() # Code hangs here
print(2)
params = a2a_pb2.SendMessageRequest()
print(3)
# ...rest of the codeRoot Cause Analysis
This is an event loop blocking issue in the asynchronous flow:
-
Request Body Consumption:
- In Starlette/FastAPI,
request.body()can only be consumed once. - When consumed, the body is cached for subsequent calls.
- If not consumed in advance, the first attempt to consume it happens inside
rest_handler.py
- In Starlette/FastAPI,
-
Asynchronous Flow Interruption:
- When the streaming response has already begun with
EventSourceResponse - Then trying to consume the request body inside
on_message_send_stream - This causes the event loop to deadlock since the streaming response context is already active
- When the streaming response has already begun with
-
Solution Mechanism:
- Pre-consuming the body in
rest_adapter.pyensures it's cached - When
rest_handler.pylater callsrequest.body(), it gets the cached version without blocking
- Pre-consuming the body in