openai/openai-agents-python

Usage always zero in streamed agent response

Closed this issue · 5 comments

I'm using Runner.run_streamed(...).stream_events() to stream responses from an agent. However, after the stream completes, result_stream.context_wrapper.usage always returns zero. I confirmed that the stream is fully consumed before accessing usage. Is this expected behavior?

result_stream = Runner.run_streamed(agent, prompt, context=context, session=session)
async for event in result_stream.stream_events():
    # streaming logic
    usage = result_stream.context_wrapper.usage  # always zero**

FULL CODE

async def process_user_query(self, user_query: str):
    await self.session.add_items([{"role": "user", "content": user_query}])
    last_messages = await self.session.get_items(limit=4, user_query=user_query)
    temp_session = MemoryManagerSession("router_temp_" + self.session_id)
    await temp_session.add_items(last_messages)

    query_type = await self.detect_query_type(user_query)
    context = AgentContext(principal_ids=self.principal_ids, query_type=query_type)
    prompt = f"User Query (answer this): {user_query}\n"
    router_agent = self.master_agent.agent

    result_stream = Runner.run_streamed(router_agent, prompt, context=context, session=self.session)

    accumulated_output = ""
    tool_used = None
    agent_name = "MasterAgent"
    tool_call_events = []
    tool_call_results = []
    tool_calls_pending = False

    try:
        async for event in result_stream.stream_events():
            if event.type == "agent_updated_stream_event":
                agent_name = getattr(event.new_agent, "name", agent_name)
            elif event.type == "run_item_stream_event":
                if getattr(event.item, "type", None) == "tool_called":
                    tool_used = getattr(event.item, "name", None)
                    tool_call_events.append(event)
                    tool_calls_pending = True
                elif getattr(event.item, "type", None) == "tool_call_output":
                    tool_call_results.append(event)
            elif event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
                delta = event.data.delta
                if delta and isinstance(delta, str):
                    accumulated_output += delta
                    yield {"stream": True, "msg": delta, "agent": agent_name, "tool": tool_used}
    except Exception:
        pass

    synth_output = ""
    if tool_calls_pending:
        synth_stream = Runner.run_streamed(router_agent, prompt, context=context, session=self.session)
        try:
            async for event in synth_stream.stream_events():
                if event.type == "agent_updated_stream_event":
                    agent_name = getattr(event.new_agent, "name", agent_name)
                elif event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
                    delta = event.data.delta
                    if delta and isinstance(delta, str):
                        synth_output += delta
                        yield {"stream": True, "msg": delta, "agent": agent_name, "tool": tool_used}
        except Exception:
            pass

    final_output = (accumulated_output + synth_output).strip()
    if not final_output:
        fallback_msg = (
            "⚠️ Sorry, something went wrong and I couldn't generate a response.\n"
            "You can try again with the same query, or refresh the session and try once more."
        )
        yield {"stream": False, "msg": fallback_msg, "agent": agent_name, "tool": tool_used}

    await self.session.add_items([{"role": "assistant", "content": final_output}])

    # Ensure stream is fully consumed before accessing usage
    usage = getattr(result_stream.context_wrapper, "usage", {})
    trace = getattr(result_stream, "trace", {})
    print("Usage:", usage)
    print("Trace:", trace)

Can you run the following code? You can see the usage has data at the end:

import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner

async def main():
    agent = Agent(
        name="Joker",
        instructions="You are a helpful assistant.",
    )
    result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
            print(event.data.delta, end="", flush=True)
    print(result.context_wrapper.usage)

if __name__ == "__main__":
    asyncio.run(main())

If you're using non-OpenAI model in some way, some models may not return usage data to this SDK side.

This is happening when I use Gemini's Model — gemini-flash and gemini-flash-lite — using LiteLLM extension.
The usage is always zero.

I am wondering if this is an issue that will get fixed.

Non streaming flow for Gemini works fine, and the usage is available properly.

Usage(requests=0, input_tokens=0, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=0, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=0)

@Ali-Olliek For Gemini models, you can receive usage data if you have model_settings=ModelSettings(include_usage=True):

import asyncio
import os

from agents import Agent, Runner, function_tool, set_tracing_disabled
from agents.extensions.models.litellm_model import LitellmModel
from agents.model_settings import ModelSettings

set_tracing_disabled(disabled=True)

@function_tool
def get_weather(city: str):
    print(f"[debug] getting weather for {city}")
    return f"The weather in {city} is sunny."

async def main():
    agent = Agent(
        name="Assistant",
        instructions="You only respond in haikus.",
        model=LitellmModel(model="gemini/gemini-2.5-flash", api_key=os.environ["GEMINI_API_KEY"]),
        model_settings=ModelSettings(include_usage=True),
        tools=[get_weather],
    )
    stream = Runner.run_streamed(agent, "What's the weather in Tokyo?")
    async for event in stream.stream_events():
        if event.type == "raw_response_event" and event.data.type == "response.output_text.delta":
            print(event.data.delta, end="", flush=True)

    print()
    print()
    print(f"usage: {stream.context_wrapper.usage}")

if __name__ == "__main__":
    asyncio.run(main())

Thank you @seratch <3

Thanks @seratch , that worked for me too.

I was using the Open model with Azure OpenAI.. and when I invoked it using AsyncAzureOpenAI with model_settings=ModelSettings(include_usage=True), it worked fine...