langchain-ai/langchain

ChatBedrockConverse does not emit on_chat_model_stream events in LangGraph astream_events()

Opened this issue ยท 0 comments

Checked other resources

  • This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

#!/usr/bin/env python3
"""
Minimal Reproducible Example (MRE) for ChatBedrock vs ChatBedrockConverse streaming event issue.

This script demonstrates the difference in event emission between ChatBedrock and ChatBedrockConverse
when used with LangGraph's astream_events().

Expected behavior:
- ChatBedrock should emit 'on_chat_model_stream' events during streaming
- ChatBedrockConverse may emit different events or no streaming events

Run with: python debug_bedrock_streaming.py
"""

import asyncio
import os
from typing import Dict, Any, List

from langchain_aws import ChatBedrock, ChatBedrockConverse
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import create_react_agent


def create_react_agent_with_bedrock(model_client):
    """Create a LangGraph agent using create_react_agent (like the actual guest agent)."""

    # Simple system prompt
    system_prompt = """You are a helpful assistant. Answer questions concisely."""

    # Create agent using create_react_agent (same as guest agent)
    graph = create_react_agent(
        model=model_client,
        tools=[],  # No tools, just conversation
        prompt=system_prompt,
        name="test_agent",
    )

    return graph


async def test_streaming_events(graph, test_name: str) -> List[Dict[str, Any]]:
    """Test streaming events from a LangGraph and collect all events."""
    print(f"\n=== Testing {test_name} ===")

    events_collected = []
    test_message = "Hello, can you tell me about personal finance in one sentence?"

    try:
        async for event in graph.astream_events(
            {"messages": [HumanMessage(content=test_message)]},
            version="v1"
        ):
            event_type = event.get("event", "unknown")
            name = event.get("name", "unknown")

            print(f"Event: {event_type} | Name: {name}")

            # Collect all events for analysis
            events_collected.append({
                "event": event_type,
                "name": name,
                "data_keys": list(event.get("data", {}).keys()) if event.get("data") else []
            })

            # Specifically check for streaming events
            if event_type == "on_chat_model_stream":
                print("  โœ… FOUND: on_chat_model_stream event!")
                # Try to extract streaming content
                data = event.get("data", {})
                if "chunk" in data:
                    chunk = data["chunk"]
                    if hasattr(chunk, "content"):
                        print(f"    Content: {chunk.content}")
            elif event_type == "on_chat_model_end":
                print("  ๐Ÿ“ FOUND: on_chat_model_end event (alternative to streaming)")
                # Try to extract final content
                data = event.get("data", {})
                if "output" in data:
                    output = data["output"]
                    if hasattr(output, "content"):
                        print(f"    Final content: {output.content}")
            elif event_type == "on_chain_stream":
                print("  ๐Ÿ”„ FOUND: on_chain_stream event")
                # This might contain streaming data too
                data = event.get("data", {})
                print(f"    Stream data keys: {list(data.keys())}")

                # Check if this contains the actual streaming content
                if "chunk" in data:
                    chunk = data["chunk"]
                    print(f"    Chunk type: {type(chunk)}")
                    print(f"    Chunk keys (if dict): {list(chunk.keys()) if isinstance(chunk, dict) else 'N/A'}")

                    # Deep inspection of chunk content
                    if isinstance(chunk, dict):
                        for key, value in chunk.items():
                            print(f"      {key}: {type(value)} = {repr(value)}")
                    elif hasattr(chunk, "content"):
                        content = chunk.content
                        print(f"    Chunk content: {repr(content)}")
                        if isinstance(content, list) and content:
                            # Handle structured content like in the guest service
                            for item in content:
                                if isinstance(item, dict) and "text" in item:
                                    text = item["text"]
                                    print(f"    Extracted text: '{text}'")
                        elif isinstance(content, str):
                            print(f"    String content: '{content}'")
                    else:
                        print(f"    Chunk repr: {repr(chunk)}")

            elif event_type == "on_chat_model_stream":
                print("  โœ… FOUND: on_chat_model_stream event!")
                # Compare with chat model stream content
                data = event.get("data", {})
                print(f"    Chat model stream data keys: {list(data.keys())}")
                if "chunk" in data:
                    chunk = data["chunk"]
                    print(f"    Chat model chunk type: {type(chunk)}")
                    if hasattr(chunk, "content"):
                        content = chunk.content
                        print(f"    Chat model chunk content: {repr(content)}")
                    else:
                        print(f"    Chat model chunk repr: {repr(chunk)}")

    except Exception as e:
        print(f"โŒ Error during streaming: {e}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")
        return []

    return events_collected


async def test_direct_streaming():
    """Test direct streaming from ChatBedrockConverse without LangGraph."""
    from langchain_aws import ChatBedrockConverse
    from langchain_core.messages import HumanMessage

    print("\n๐Ÿ”ฌ TESTING DIRECT STREAMING (bypassing LangGraph)")
    print("=" * 50)

    model_id = "openai.gpt-oss-120b-1:0"
    region = "us-west-2"

    try:
        # Test with default parameters (no streaming parameter needed)
        chat_model = ChatBedrockConverse(
            model_id=model_id,
            region_name=region,
            temperature=0.4,
        )

        print("Testing direct astream() method...")
        messages = [HumanMessage(content="Hello, tell me about personal finance in one sentence")]

        chunks_received = 0
        full_content = ""
        async for chunk in chat_model.astream(messages):
            chunks_received += 1
            print(f"Chunk {chunks_received}: {type(chunk)}")

            # Extract content from chunk
            if hasattr(chunk, 'content'):
                content = chunk.content
                if isinstance(content, list):
                    for item in content:
                        if isinstance(item, dict) and 'text' in item:
                            text = item.get('text', '')
                            if text:
                                print(f"  Text: '{text}'")
                                full_content += text
                        elif isinstance(item, str):
                            print(f"  String: '{item}'")
                            full_content += item
                elif isinstance(content, str):
                    print(f"  Content: '{content}'")
                    full_content += content
                else:
                    print(f"  Content: {repr(content)}")

            # Show response metadata if available
            if hasattr(chunk, 'response_metadata') and chunk.response_metadata:
                print(f"  Metadata: {chunk.response_metadata}")

            print(f"  Accumulated so far: '{full_content}'\n")

        print(f"\n๐Ÿ“Š DIRECT STREAMING RESULTS:")
        print(f"  - Chunks received: {chunks_received}")
        if chunks_received > 0:
            print("  โœ… SUCCESS: Direct streaming works!")
        else:
            print("  โŒ FAILED: No chunks received")

    except Exception as e:
        print(f"โŒ DIRECT STREAMING ERROR: {e}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")


async def main():
    """Main test function comparing both Bedrock clients."""

    # Configuration - CURRENT setup that you're actually using
    model_id = "openai.gpt-oss-120b-1:0"  # Current OpenAI model (only works with ChatBedrockConverse)
    region = "us-west-2"  # Current region

    print(f"Testing with model: {model_id}")
    print(f"Testing with region: {region}")
    print("NOTE: ChatBedrock doesn't support this OpenAI model, so we'll see AccessDenied for it")
    print()

    print("๐Ÿ” Testing Bedrock Streaming Event Emission")
    print("=" * 50)

    # Test 1: ChatBedrock (old implementation)
    print("\n--- Setting up ChatBedrock (old) ---")
    bedrock_old = ChatBedrock(
        model_id=model_id,
        region_name=region,
        temperature=0.4,
        streaming=True  # Explicitly enable streaming
    )

    graph_old = create_react_agent_with_bedrock(bedrock_old)
    events_old = await test_streaming_events(graph_old, "ChatBedrock (Old)")

    # Test 2: ChatBedrockConverse (new implementation)
    print("\n--- Setting up ChatBedrockConverse (new) ---")
    bedrock_new = ChatBedrockConverse(
        model_id=model_id,
        region_name=region,
        temperature=0.4,
        # Note: ChatBedrockConverse may handle streaming differently
    )

    graph_new = create_react_agent_with_bedrock(bedrock_new)
    events_new = await test_streaming_events(graph_new, "ChatBedrockConverse (New)")

    # Analysis
    print("\n" + "=" * 50)
    print("๐Ÿ“Š ANALYSIS")
    print("=" * 50)

    # Check for streaming events in both tests
    old_stream_events = [e for e in events_old if e["event"] == "on_chat_model_stream"]
    new_stream_events = [e for e in events_new if e["event"] == "on_chat_model_stream"]

    old_end_events = [e for e in events_old if e["event"] == "on_chat_model_end"]
    new_end_events = [e for e in events_new if e["event"] == "on_chat_model_end"]

    print(f"\nChatBedrock (Old):")
    print(f"  - Total events: {len(events_old)}")
    print(f"  - on_chat_model_stream events: {len(old_stream_events)}")
    print(f"  - on_chat_model_end events: {len(old_end_events)}")

    print(f"\nChatBedrockConverse (New):")
    print(f"  - Total events: {len(events_new)}")
    print(f"  - on_chat_model_stream events: {len(new_stream_events)}")
    print(f"  - on_chat_model_end events: {len(new_end_events)}")

    # Analysis based on actual usage scenario
    print("\n๐Ÿ” ANALYSIS OF YOUR MIGRATION SCENARIO:")
    print("=" * 50)

    if len(old_stream_events) == 0 and len(new_stream_events) == 0:
        print("๐Ÿ“‹ WHAT'S HAPPENING:")
        print("  - ChatBedrock doesn't support the OpenAI model you're now using")
        print("  - ChatBedrockConverse DOES support it, but emits events differently")
        print("  - Your guest service expects 'on_chat_model_stream' events")
        print("  - ChatBedrockConverse may emit streaming data via 'on_chain_stream' events")

    if len(new_end_events) > 0:
        print("\n๐Ÿ’ก SOLUTION APPROACH:")
        print("  Update your streaming logic to handle BOTH event types:")
        print("  1. Listen for 'on_chat_model_stream' (for backward compatibility)")
        print("  2. Listen for 'on_chain_stream' (for ChatBedrockConverse)")
        print("  3. Extract content from both event types consistently")

    print("\n๐ŸŽฏ KEY INSIGHT:")
    print("  This isn't a 'bug' - it's a difference in how the two clients")
    print("  emit streaming events for different model types.")

    # Show all unique event types for debugging
    print("\n๐Ÿ“‹ UNIQUE EVENT TYPES:")
    print(f"  ChatBedrock: {set(e['event'] for e in events_old)}")
    print(f"  ChatBedrockConverse: {set(e['event'] for e in events_new)}")

    # Test direct streaming to see if the model supports it
    await test_direct_streaming()


if __name__ == "__main__":
    # Set up basic logging to see any warnings/errors
    import logging
    logging.basicConfig(level=logging.INFO)

    # Run the test
    asyncio.run(main())

Error Message and Stack Trace (if applicable)

Description

When using ChatBedrockConverse with OpenAI models in LangGraph's astream_events(), the expected on_chat_model_stream events are not emitted. Instead, complete messages are delivered via on_chain_stream events.

Expected Behavior:
Both ChatBedrock and ChatBedrockConverse should emit on_chat_model_stream events for consistency

Actual Behavior:
ChatBedrock (Anthropic models): โœ… Emits on_chat_model_stream events
ChatBedrockConverse (OpenAI models): โŒ Emits on_chain_stream events instead
Impact: Applications relying on on_chat_model_stream events lose streaming functionality when migrating from Anthropic to OpenAI models.

System Info

My app is running a dockerfile with the minimum packages, and this is reproducible in this enviroment:
System Information

OS: Windows
OS Version: 10.0.26100
Python Version: 3.13.3 (tags/v3.13.3:6280bb5, Apr 8 2025, 14:47:33) [MSC v.1943 64 bit (AMD64)]

Package Information

langchain_core: 0.3.76
langchain: 0.3.25
langsmith: 0.3.45
langchain_aws: 0.2.31
langchain_openai: 0.3.24
langchain_text_splitters: 0.3.8
langgraph_sdk: 0.1.69

Optional packages not installed

langserve

Other Dependencies

async-timeout<5.0.0,>=4.0.0;: Installed. No version info available.
beautifulsoup4: 4.13.4
bedrock-agentcore: Installed. No version info available.
boto3: 1.40.27
httpx: 0.28.1
jsonpatch<2.0,>=1.33: Installed. No version info available.
langchain-anthropic;: Installed. No version info available.
langchain-aws;: Installed. No version info available.
langchain-azure-ai;: Installed. No version info available.
langchain-cohere;: Installed. No version info available.
langchain-community;: Installed. No version info available.
langchain-core<1.0.0,>=0.3.51: Installed. No version info available.
langchain-core<1.0.0,>=0.3.58: Installed. No version info available.
langchain-core<1.0.0,>=0.3.65: Installed. No version info available.
langchain-deepseek;: Installed. No version info available.
langchain-fireworks;: Installed. No version info available.
langchain-google-genai;: Installed. No version info available.
langchain-google-vertexai;: Installed. No version info available.
langchain-groq;: Installed. No version info available.
langchain-huggingface;: Installed. No version info available.
langchain-mistralai;: Installed. No version info available.
langchain-ollama;: Installed. No version info available.
langchain-openai;: Installed. No version info available.
langchain-perplexity;: Installed. No version info available.
langchain-text-splitters<1.0.0,>=0.3.8: Installed. No version info available.
langchain-together;: Installed. No version info available.
langchain-xai;: Installed. No version info available.
langsmith-pyo3: Installed. No version info available.
langsmith<0.4,>=0.1.17: Installed. No version info available.
langsmith>=0.3.45: Installed. No version info available.
numpy: 2.3.2
openai-agents: Installed. No version info available.
openai<2.0.0,>=1.86.0: Installed. No version info available.
opentelemetry-api: 1.34.1
opentelemetry-exporter-otlp-proto-http: 1.34.1
opentelemetry-sdk: 1.34.1
orjson: 3.10.18
packaging: 24.2
packaging>=23.2: Installed. No version info available.
playwright: Installed. No version info available.
pydantic: 2.11.4
pydantic<3.0.0,>=2.7.4: Installed. No version info available.
pydantic>=2.7.4: Installed. No version info available.
pytest: 8.4.1
PyYAML>=5.3: Installed. No version info available.
requests: 2.32.3
requests-toolbelt: 1.0.0
requests<3,>=2: Installed. No version info available.
rich: 14.0.0
SQLAlchemy<3,>=1.4: Installed. No version info available.
tenacity!=8.4.0,<10.0.0,>=8.1.0: Installed. No version info available.
tiktoken<1,>=0.7: Installed. No version info available.
typing-extensions>=4.7: Installed. No version info available.
zstandard: 0.23.0