langflow-ai/langflow

API-Streaming responses a 500 Error

Closed this issue · 1 comments

Bug Description

when a langflow api is called with streamed then the response is the following error:
{"message":"["Something went wrong while serializing the response. Please share this error on our GitHub repository.", "Unable to serialize unknown type: <class 'langchain_core.messages.ai.AIMessageChunk'>"]"}

Example apicall:
url = f"{BASE_API_URL}/api/v1/run/{flow_id}?stream=True"

Reproduction

Basic Python reproduction code for testing:
import httpx
import logging

BASE_API_URL = "http://127.0.0.1:7860"
FLOW_ID = "18b6553e-f0d7-4d7b-b191-7db803a53776"
ENDPOINT = "bulletpoints" # The endpoint name of the flow

def is_healthy():
try:
url = f"{BASE_API_URL}/health"
response = httpx.get(url)
return response.status_code == 200
except Exception as e:
logging.error(f"Error: {str(e)}")
return False

def initiate_session(flow_id, input_value, stream: bool = True):
url = f"{BASE_API_URL}/api/v1/run/{flow_id}?stream={stream}"
headers = {"Content-Type": "application/json"}
data = {
"input_value": input_value,
}
logging.info(f"Initiating session with data: {data}")
healthy = is_healthy()
while not healthy:
time.sleep(1)
healthy = is_healthy()

logging.info(f"Health check passed: {healthy}")

response = httpx.post(url, json=data, headers=headers)
print(response.text)
if response.status_code == 200:
    return response.json()
else:
    raise Exception("Failed to initiate session")

def stream_response(stream_url, session_id):
stream_url = f"http://localhost:3000{stream_url}"
url = f"{BASE_API_URL}{stream_url}"

params = {"session_id": session_id}

with httpx.stream("GET", stream_url, params=params, timeout=None) as response:
    for line in response.iter_lines():
        # lines are data: {"chunk":","} and are strings
        logging.info(f"Line: {line}")

def run_flow(message: str) -> dict:
logging.basicConfig(level=logging.INFO, format="%(message)s")

init_response = initiate_session(
    flow_id=FLOW_ID,
    input_value=message,
    stream=True,
)
logging.info(f"Init Response: {init_response}")
session_id = init_response["session_id"]
has_stream_url = "stream_url" in init_response["outputs"][0]["outputs"][0]["artifacts"]
if not has_stream_url and args.stream:
    raise Exception("No stream URL returned")
stream_url = init_response["outputs"][0]["outputs"][0]["artifacts"]["stream_url"]

logging.info(f"Initiated session with ID: {session_id}")
if stream_url:
    stream_response(stream_url, session_id)

def main():
response = run_flow(
message="HEllo worlllld !",
endpoint=FLOW_ID,
)

if name == "main":
main()

Expected behavior

Expecting a Status 200 instead of 500

Who can help?

No response

Operating System

windows 11

Langflow Version

1.0.18

Python Version

None

Screenshot

No response

Flow File

langflow_Summarize tool.json

Hey @EnricoBaivo! I'm here to help you with any bugs, questions, or contributions. Let's tackle this issue together.

The 500 error with the message "Something went wrong while serializing the response" occurs because the stream_build endpoint in the chat.py file is designed to handle streaming responses. If an exception occurs during the streaming process, it catches the error and raises an HTTPException with a 500 status code. Specifically, the error message indicates that the AIMessageChunk class from langchain_core.messages.ai is not being serialized correctly [1].

To address this issue, you need to ensure that the AIMessageChunk class is properly serialized. Here are a few steps you can take:

  1. Check Serialization Logic: Ensure that the AIMessageChunk class has a proper serialization method. This might involve implementing a custom JSON encoder for this class.

  2. Update Endpoint: Modify the stream_build endpoint to handle the serialization of AIMessageChunk objects correctly.

  3. Debugging: Add logging to identify where the serialization is failing. This can help pinpoint the exact issue.

Here is a simplified example of how you might implement a custom JSON encoder for the AIMessageChunk class:

import json
from langchain_core.messages.ai import AIMessageChunk

class AIMessageChunkEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, AIMessageChunk):
            return obj.__dict__  # or implement a method to convert to a serializable dict
        return super().default(obj)

# Use the custom encoder when serializing
json.dumps(your_data, cls=AIMessageChunkEncoder)

By ensuring that the AIMessageChunk class is properly serialized, you should be able to resolve the 500 error and get the expected 200 status response.

To continue talking to Dosu, mention @dosu.