ChatBedrockConverse does not emit on_chat_model_stream events in LangGraph astream_events()
Opened this issue ยท 0 comments
Checked other resources
- This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
- I added a clear and detailed title that summarizes the issue.
- I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
- I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
#!/usr/bin/env python3
"""
Minimal Reproducible Example (MRE) for ChatBedrock vs ChatBedrockConverse streaming event issue.
This script demonstrates the difference in event emission between ChatBedrock and ChatBedrockConverse
when used with LangGraph's astream_events().
Expected behavior:
- ChatBedrock should emit 'on_chat_model_stream' events during streaming
- ChatBedrockConverse may emit different events or no streaming events
Run with: python debug_bedrock_streaming.py
"""
import asyncio
import os
from typing import Dict, Any, List
from langchain_aws import ChatBedrock, ChatBedrockConverse
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import create_react_agent
def create_react_agent_with_bedrock(model_client):
"""Create a LangGraph agent using create_react_agent (like the actual guest agent)."""
# Simple system prompt
system_prompt = """You are a helpful assistant. Answer questions concisely."""
# Create agent using create_react_agent (same as guest agent)
graph = create_react_agent(
model=model_client,
tools=[], # No tools, just conversation
prompt=system_prompt,
name="test_agent",
)
return graph
async def test_streaming_events(graph, test_name: str) -> List[Dict[str, Any]]:
"""Test streaming events from a LangGraph and collect all events."""
print(f"\n=== Testing {test_name} ===")
events_collected = []
test_message = "Hello, can you tell me about personal finance in one sentence?"
try:
async for event in graph.astream_events(
{"messages": [HumanMessage(content=test_message)]},
version="v1"
):
event_type = event.get("event", "unknown")
name = event.get("name", "unknown")
print(f"Event: {event_type} | Name: {name}")
# Collect all events for analysis
events_collected.append({
"event": event_type,
"name": name,
"data_keys": list(event.get("data", {}).keys()) if event.get("data") else []
})
# Specifically check for streaming events
if event_type == "on_chat_model_stream":
print(" โ
FOUND: on_chat_model_stream event!")
# Try to extract streaming content
data = event.get("data", {})
if "chunk" in data:
chunk = data["chunk"]
if hasattr(chunk, "content"):
print(f" Content: {chunk.content}")
elif event_type == "on_chat_model_end":
print(" ๐ FOUND: on_chat_model_end event (alternative to streaming)")
# Try to extract final content
data = event.get("data", {})
if "output" in data:
output = data["output"]
if hasattr(output, "content"):
print(f" Final content: {output.content}")
elif event_type == "on_chain_stream":
print(" ๐ FOUND: on_chain_stream event")
# This might contain streaming data too
data = event.get("data", {})
print(f" Stream data keys: {list(data.keys())}")
# Check if this contains the actual streaming content
if "chunk" in data:
chunk = data["chunk"]
print(f" Chunk type: {type(chunk)}")
print(f" Chunk keys (if dict): {list(chunk.keys()) if isinstance(chunk, dict) else 'N/A'}")
# Deep inspection of chunk content
if isinstance(chunk, dict):
for key, value in chunk.items():
print(f" {key}: {type(value)} = {repr(value)}")
elif hasattr(chunk, "content"):
content = chunk.content
print(f" Chunk content: {repr(content)}")
if isinstance(content, list) and content:
# Handle structured content like in the guest service
for item in content:
if isinstance(item, dict) and "text" in item:
text = item["text"]
print(f" Extracted text: '{text}'")
elif isinstance(content, str):
print(f" String content: '{content}'")
else:
print(f" Chunk repr: {repr(chunk)}")
elif event_type == "on_chat_model_stream":
print(" โ
FOUND: on_chat_model_stream event!")
# Compare with chat model stream content
data = event.get("data", {})
print(f" Chat model stream data keys: {list(data.keys())}")
if "chunk" in data:
chunk = data["chunk"]
print(f" Chat model chunk type: {type(chunk)}")
if hasattr(chunk, "content"):
content = chunk.content
print(f" Chat model chunk content: {repr(content)}")
else:
print(f" Chat model chunk repr: {repr(chunk)}")
except Exception as e:
print(f"โ Error during streaming: {e}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
return []
return events_collected
async def test_direct_streaming():
"""Test direct streaming from ChatBedrockConverse without LangGraph."""
from langchain_aws import ChatBedrockConverse
from langchain_core.messages import HumanMessage
print("\n๐ฌ TESTING DIRECT STREAMING (bypassing LangGraph)")
print("=" * 50)
model_id = "openai.gpt-oss-120b-1:0"
region = "us-west-2"
try:
# Test with default parameters (no streaming parameter needed)
chat_model = ChatBedrockConverse(
model_id=model_id,
region_name=region,
temperature=0.4,
)
print("Testing direct astream() method...")
messages = [HumanMessage(content="Hello, tell me about personal finance in one sentence")]
chunks_received = 0
full_content = ""
async for chunk in chat_model.astream(messages):
chunks_received += 1
print(f"Chunk {chunks_received}: {type(chunk)}")
# Extract content from chunk
if hasattr(chunk, 'content'):
content = chunk.content
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and 'text' in item:
text = item.get('text', '')
if text:
print(f" Text: '{text}'")
full_content += text
elif isinstance(item, str):
print(f" String: '{item}'")
full_content += item
elif isinstance(content, str):
print(f" Content: '{content}'")
full_content += content
else:
print(f" Content: {repr(content)}")
# Show response metadata if available
if hasattr(chunk, 'response_metadata') and chunk.response_metadata:
print(f" Metadata: {chunk.response_metadata}")
print(f" Accumulated so far: '{full_content}'\n")
print(f"\n๐ DIRECT STREAMING RESULTS:")
print(f" - Chunks received: {chunks_received}")
if chunks_received > 0:
print(" โ
SUCCESS: Direct streaming works!")
else:
print(" โ FAILED: No chunks received")
except Exception as e:
print(f"โ DIRECT STREAMING ERROR: {e}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
async def main():
"""Main test function comparing both Bedrock clients."""
# Configuration - CURRENT setup that you're actually using
model_id = "openai.gpt-oss-120b-1:0" # Current OpenAI model (only works with ChatBedrockConverse)
region = "us-west-2" # Current region
print(f"Testing with model: {model_id}")
print(f"Testing with region: {region}")
print("NOTE: ChatBedrock doesn't support this OpenAI model, so we'll see AccessDenied for it")
print()
print("๐ Testing Bedrock Streaming Event Emission")
print("=" * 50)
# Test 1: ChatBedrock (old implementation)
print("\n--- Setting up ChatBedrock (old) ---")
bedrock_old = ChatBedrock(
model_id=model_id,
region_name=region,
temperature=0.4,
streaming=True # Explicitly enable streaming
)
graph_old = create_react_agent_with_bedrock(bedrock_old)
events_old = await test_streaming_events(graph_old, "ChatBedrock (Old)")
# Test 2: ChatBedrockConverse (new implementation)
print("\n--- Setting up ChatBedrockConverse (new) ---")
bedrock_new = ChatBedrockConverse(
model_id=model_id,
region_name=region,
temperature=0.4,
# Note: ChatBedrockConverse may handle streaming differently
)
graph_new = create_react_agent_with_bedrock(bedrock_new)
events_new = await test_streaming_events(graph_new, "ChatBedrockConverse (New)")
# Analysis
print("\n" + "=" * 50)
print("๐ ANALYSIS")
print("=" * 50)
# Check for streaming events in both tests
old_stream_events = [e for e in events_old if e["event"] == "on_chat_model_stream"]
new_stream_events = [e for e in events_new if e["event"] == "on_chat_model_stream"]
old_end_events = [e for e in events_old if e["event"] == "on_chat_model_end"]
new_end_events = [e for e in events_new if e["event"] == "on_chat_model_end"]
print(f"\nChatBedrock (Old):")
print(f" - Total events: {len(events_old)}")
print(f" - on_chat_model_stream events: {len(old_stream_events)}")
print(f" - on_chat_model_end events: {len(old_end_events)}")
print(f"\nChatBedrockConverse (New):")
print(f" - Total events: {len(events_new)}")
print(f" - on_chat_model_stream events: {len(new_stream_events)}")
print(f" - on_chat_model_end events: {len(new_end_events)}")
# Analysis based on actual usage scenario
print("\n๐ ANALYSIS OF YOUR MIGRATION SCENARIO:")
print("=" * 50)
if len(old_stream_events) == 0 and len(new_stream_events) == 0:
print("๐ WHAT'S HAPPENING:")
print(" - ChatBedrock doesn't support the OpenAI model you're now using")
print(" - ChatBedrockConverse DOES support it, but emits events differently")
print(" - Your guest service expects 'on_chat_model_stream' events")
print(" - ChatBedrockConverse may emit streaming data via 'on_chain_stream' events")
if len(new_end_events) > 0:
print("\n๐ก SOLUTION APPROACH:")
print(" Update your streaming logic to handle BOTH event types:")
print(" 1. Listen for 'on_chat_model_stream' (for backward compatibility)")
print(" 2. Listen for 'on_chain_stream' (for ChatBedrockConverse)")
print(" 3. Extract content from both event types consistently")
print("\n๐ฏ KEY INSIGHT:")
print(" This isn't a 'bug' - it's a difference in how the two clients")
print(" emit streaming events for different model types.")
# Show all unique event types for debugging
print("\n๐ UNIQUE EVENT TYPES:")
print(f" ChatBedrock: {set(e['event'] for e in events_old)}")
print(f" ChatBedrockConverse: {set(e['event'] for e in events_new)}")
# Test direct streaming to see if the model supports it
await test_direct_streaming()
if __name__ == "__main__":
# Set up basic logging to see any warnings/errors
import logging
logging.basicConfig(level=logging.INFO)
# Run the test
asyncio.run(main())Error Message and Stack Trace (if applicable)
Description
When using ChatBedrockConverse with OpenAI models in LangGraph's astream_events(), the expected on_chat_model_stream events are not emitted. Instead, complete messages are delivered via on_chain_stream events.
Expected Behavior:
Both ChatBedrock and ChatBedrockConverse should emit on_chat_model_stream events for consistency
Actual Behavior:
ChatBedrock (Anthropic models): โ
Emits on_chat_model_stream events
ChatBedrockConverse (OpenAI models): โ Emits on_chain_stream events instead
Impact: Applications relying on on_chat_model_stream events lose streaming functionality when migrating from Anthropic to OpenAI models.
System Info
My app is running a dockerfile with the minimum packages, and this is reproducible in this enviroment:
System Information
OS: Windows
OS Version: 10.0.26100
Python Version: 3.13.3 (tags/v3.13.3:6280bb5, Apr 8 2025, 14:47:33) [MSC v.1943 64 bit (AMD64)]
Package Information
langchain_core: 0.3.76
langchain: 0.3.25
langsmith: 0.3.45
langchain_aws: 0.2.31
langchain_openai: 0.3.24
langchain_text_splitters: 0.3.8
langgraph_sdk: 0.1.69
Optional packages not installed
langserve
Other Dependencies
async-timeout<5.0.0,>=4.0.0;: Installed. No version info available.
beautifulsoup4: 4.13.4
bedrock-agentcore: Installed. No version info available.
boto3: 1.40.27
httpx: 0.28.1
jsonpatch<2.0,>=1.33: Installed. No version info available.
langchain-anthropic;: Installed. No version info available.
langchain-aws;: Installed. No version info available.
langchain-azure-ai;: Installed. No version info available.
langchain-cohere;: Installed. No version info available.
langchain-community;: Installed. No version info available.
langchain-core<1.0.0,>=0.3.51: Installed. No version info available.
langchain-core<1.0.0,>=0.3.58: Installed. No version info available.
langchain-core<1.0.0,>=0.3.65: Installed. No version info available.
langchain-deepseek;: Installed. No version info available.
langchain-fireworks;: Installed. No version info available.
langchain-google-genai;: Installed. No version info available.
langchain-google-vertexai;: Installed. No version info available.
langchain-groq;: Installed. No version info available.
langchain-huggingface;: Installed. No version info available.
langchain-mistralai;: Installed. No version info available.
langchain-ollama;: Installed. No version info available.
langchain-openai;: Installed. No version info available.
langchain-perplexity;: Installed. No version info available.
langchain-text-splitters<1.0.0,>=0.3.8: Installed. No version info available.
langchain-together;: Installed. No version info available.
langchain-xai;: Installed. No version info available.
langsmith-pyo3: Installed. No version info available.
langsmith<0.4,>=0.1.17: Installed. No version info available.
langsmith>=0.3.45: Installed. No version info available.
numpy: 2.3.2
openai-agents: Installed. No version info available.
openai<2.0.0,>=1.86.0: Installed. No version info available.
opentelemetry-api: 1.34.1
opentelemetry-exporter-otlp-proto-http: 1.34.1
opentelemetry-sdk: 1.34.1
orjson: 3.10.18
packaging: 24.2
packaging>=23.2: Installed. No version info available.
playwright: Installed. No version info available.
pydantic: 2.11.4
pydantic<3.0.0,>=2.7.4: Installed. No version info available.
pydantic>=2.7.4: Installed. No version info available.
pytest: 8.4.1
PyYAML>=5.3: Installed. No version info available.
requests: 2.32.3
requests-toolbelt: 1.0.0
requests<3,>=2: Installed. No version info available.
rich: 14.0.0
SQLAlchemy<3,>=1.4: Installed. No version info available.
tenacity!=8.4.0,<10.0.0,>=8.1.0: Installed. No version info available.
tiktoken<1,>=0.7: Installed. No version info available.
typing-extensions>=4.7: Installed. No version info available.
zstandard: 0.23.0