jina-ai/jina

Streaming breaks for HTTP client when fields are not str, int, or float

NarekA opened this issue · 10 comments

Describe the bug

Http streaming breaks when documents have fields which are not str, int, or float. This includes dicts, bools, and nested objects. The example below works for grpc but not http.

import json
import time
from typing import Dict

from docarray import BaseDoc
from jina import Executor, Deployment, Client, requests
from pydantic import Field


# first define schemas
class MyDocument(BaseDoc):
    text: str
    other_doc: Dict = Field(default_factory=dict)

    def to_request_data(self):
        return json.dumps({"params": self.dict()}, indent=2)

# then define the Executor
class MyExecutor(Executor):
    @requests(on="/stream")
    async def task(self, doc: MyDocument, **kwargs) -> MyDocument:
        for i in range(10):
            yield MyDocument(text=f"hello world {doc.text} {i}")
            time.sleep(1)


example_doc = MyDocument(text="my input text")


async def stream_docs():
    protocol = "http"
    with Deployment(
        uses=MyExecutor, port=[12346], protocol=[protocol], cors=True
    ) as dep:
        client = Client(port=12346, protocol=protocol, asyncio=True)
        async for doc in client.stream_doc(
            on="/stream",
            inputs=example_doc,
            input_type=MyDocument,
            return_type=MyDocument,
        ):
            print(doc)



if __name__ == "__main__":
    import asyncio

    asyncio.run(stream_docs())

Describe how you solve it

The source of the problem seems to be here. The input is being passed into streaming executor as params instead of a request body. It should either

  1. Use http get with a body. See this link

    Sending a body with a GET request has an undefined behavior in the specifications, nevertheless, it is supported by FastAPI, only for very complex/extreme use cases.

  2. Use a different http method

  3. Serialize/deserialize the fields before IO.


Environment

Screenshots

Here is a stacktrace:

Traceback (most recent call last):
  File ".../app.py", line 71, in <module>
    asyncio.run(stream_docs())
  File "...python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "...python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File ".../app.py", line 53, in stream_docs
    async for doc in client.stream_doc(
  File "...python3.9/site-packages/jina/clients/mixin.py", line 569, in stream_doc
    async for doc in c._get_streaming_results(
  File "...python3.9/site-packages/jina/clients/base/http.py", line 258, in _get_streaming_results
    async for doc in iolet.send_streaming_message(doc=inputs, on=on):
  File "...python3.9/site-packages/jina/clients/base/helper.py", line 212, in send_streaming_message
    async with self.session.get(**request_kwargs) as response:
  File "...python3.9/site-packages/aiohttp/client.py", line 1167, in __aenter__
    self._resp = await self._coro
  File "...python3.9/site-packages/aiohttp/client.py", line 534, in _request
    req = self._request_class(
  File "...python3.9/site-packages/aiohttp/client_reqrep.py", line 280, in __init__
    url2 = url.with_query(params)
  File "...python3.9/site-packages/yarl/_url.py", line 1007, in with_query
    new_query = self._get_str_query(*args, **kwargs) or ""
  File "...python3.9/site-packages/yarl/_url.py", line 968, in _get_str_query
    query = "&".join(self._query_seq_pairs(quoter, query.items()))
  File "...python3.9/site-packages/yarl/_url.py", line 931, in _query_seq_pairs
    yield quoter(key) + "=" + quoter(cls._query_var(val))
  File "...python3.9/site-packages/yarl/_url.py", line 946, in _query_var
    raise TypeError(
TypeError: Invalid variable type: value should be str, int or float, got {} of type <class 'dict'>

Note that removing theMyDocument.other_doc field makes the example work again.

Hey @NarekA,

Thanks for reporting the issue. If you have a fix proposal we would be glad to get the contribution

Of these solutions, do any of them make more sense to you?

  1. Use http get with a body. See this link

    Sending a body with a GET request has an undefined behavior in the specifications, nevertheless, it is supported by FastAPI, only for very complex/extreme use cases.

I like this one, but it looks like fastAPI might not include the fields in the docs.

  1. Use a different http method

The best solution if it's possible

  1. Serialize/deserialize the fields before IO.

Might make more sense to serialize the whole object as one param rather than try serializing individual params

Any reason we can't use http post?

@JoanFM Any reason this is using get?

@alaeddine-13 may have better insights as per this

@NarekA Actually the convention when implementing SSE in HTTP is to use GET methods instead of POST. The drawback of using GET is having to serialize the document as query string parameters. However, developers can use clients that follow the convention when using SSE. For instance, mozilla's js client EventSource implements SSE client using GET method.
I suggest flattening your document schema in case
@JoanFM I remember wanting to include a duplicate POST method for such a case, such that, if you want to follow convention -> use GET endpoint and if you want nested schemas -> use POST endpoint

@alaeddine-13 we are currently working around this by "flattening" the document, but that effectively removes any benefits of the upgrade to Docarray V2. It also means that the framework doesn't support all docarray inputs (for http at least) and that should be communicated. Would it be hard to create a second endpoint? I could look into this one if it helps.

We are definitely going to look into solving this issue. Yoy are right, in the Documentation is not stated

@JoanFM @alaeddine-13 I drafted a PR that seems to fix this issue for me: https://github.com/jina-ai/jina/pull/6091/files

It seems nice @NarekA,

I added some comments to the PR