sneako/finch

Lazy streaming

dmkl opened this issue · 23 comments

dmkl commented

Is it possible to handle a get request as a lazy stream, processing it chunk by chunk "externally"? Something like

finch_stream
|> Stream.map(&handle_each_chunk/1)
|> Stream.run()

So, it will pull chunks one by one from outside of the finch_stream. I'm not sure if it's possible to use %Stream{} in the accumulator function of Finch.stream or wrap it around Stream.resource somehow.

I would also like this feature so I can perform a request, do some processing and send the result out immediately with Plug.conn.send_chunked, without having to buffer anything as that would take a lot of memory.

It doesn't seem possible with the current API (without an extra process), but Mint should be able to do it in passive mode with Stream.resource or with a buffer process and Enumerable.t() I think. @josevalim as you wrote the original implementation in #59, do you call if there was a specific reason for using an accumulator instead of producing a stream, or did it just fit the use case more at the time?

In the current state, it is not possible to get a Stream out of Finch. Finch was designed for hammering a static set of hosts with a high rate of requests (we had this use case where I work, as well as where Keathley was working at the time). For this reason, the library is focused on connection reuse and limiting copying whenever possible. We have extended the feature set a bit where it has been possible to do so without adding overhead, but it was never intended to be a general purpose, full-featured client.

@hkrutzer I'm probably missing something, and I have not worked with Plug.Conn.send_chunked/2 myself, but why wouldn't something like this work?

      fun = fn
        {:status, _value}, conn ->
          conn

        {:headers, _value}, conn ->
          conn

        {:data, value}, conn ->
          processed = process(value)

          case Plug.Conn.chunk(conn, processed) do
            {:ok, conn} -> conn
            {:error, _} -> raise "failed to send chunk"
          end
      end

      conn = Plug.Conn.send_chunked(conn, :ok)

      Finch.build(:get, "http://example.com")
      |> Finch.stream(finch_name, conn, fun)

Thanks for looking into my example @sneako. Indeed there is something missing but I didn't specify it: I am working with newline-delimited JSON, which I need to be able to decode. So while your example works if you just stream the data in and out, or if you can process any fragment of the data separately, for nd-JSON I would need a complete line to decode it properly. This also applies to similar use cases like streaming XML. This could be accomplished with Stream functions, but not with the current Finch.stream.

Finch was designed for hammering a static set of hosts with a high rate of requests

Indeed this is why I am using Finch currently; to load data from a database that provides an HTTP endpoint. The connection pooling that Finch provides is very suitable for this.

We have extended the feature set a bit where it has been possible to do so without adding overhead, but it was never intended to be a general purpose, full-featured client.

Of course it is your prerogative as maintainer to decide what Finch should be 🙂. For me at least, I am using Finch because it is a high quality library, and because Mint is also high quality. There isn't really another option for using Mint without handling the low level stuff yourself, while getting the same performance as Finch. https://github.com/elixir-mint/mint#connection-management-and-pooling also mentions Finch as a higher-level client. There is Req but this in turn uses Finch.

So while I understand the goals behind the library, I wonder if there isn't a solution where we can keep those in mind, and resolve the issue described here as well.

Thanks for the kind words and additional context! It sounds like you just need to also manage a small buffer in the stream accumulator where you can store the incomplete json line until you have received the entire line. Does that make sense?

Indeed, you are right. I feel stupid for not thinking of that 😅

Ha, not at all :) Just to clarify, I'm not against lazy streaming in Finch, it just was never part of our design goals so there is no obvious way of shoehorning it in at this point.

I am completely willing to review proposals of how we could do this while sticking to our previously stated design goals.

I get the impression that most things that people are hoping to achieve with a Stream API are already possible with the current API, as we discovered for your specific use case, it might just be nicer if we could use the Stream module, which is why I haven't allocated very much time in to exploring this route.

Hi @sneako! First, I want to reiterate the above in saying thank you for providing and maintaining Finch — it’s a great tool in its own right and a great building block for things like Req.

My hope in posting here is to better understand the limitations of Finch’s design that prevent a streaming mechanism from being introduced. Having read much of the codebase, I think I’m seeing the shape of things, but I wanted to write it down and get your feedback/corrections.

(Note: what follows is focused on HTTP/1 for now.)

  • The fundamental problem with the current API is that it is callback-based and blocking, which means it is impossible to covert a call to Finch.stream into a lazy enumerable without the use of a second process and message passing.

  • The “chunks” given to the callback fun come from Mint’s recv, which is called in a loop until the response is complete, the timeout is hit, or the connection is closed (by raising in the callback fun).

  • In order to provide a streaming API that doesn’t require a second process, control would need to be “inverted” such that the caller is driving the underlying recv loop. This might hypothetically look something like:

    {{:status, 200}, conn} = Finch.stream(MyFinch, request)
    {{:headers, _}, conn} = Finch.stream_recv(conn)
    {{:data, _}, conn} = Finch.stream_recv(conn)
    {:done, conn} = Finch.stream_recv(conn)

In this hypothetical example, Finch acts as a sort of higher-level wrapper for Mint, but lower-level than what Finch currently exposes. I think that all of Finch’s current “amenities” would still be possible with this inversion of control:

  • The conn returned above could be an opaque type that is essentially a continuation for the request, containing or closing over all the state that’s currently tracked internally.
  • The initial call could still block until NimblePool allows the checkout and the final call to stream_recv would release the resource.
  • Telemetry/etc. could still be emitted as usual.
  • The existing callback-based/blocking-loop API could be implemented on top of this.

One possible issue (and I’m sure there are more) is that I’m not sure what to do if the caller exits/crashes, which means the resource isn’t explicitly released back to NimblePool, exception telemetry can’t be sent, etc.

Would be very interested to hear thoughts on this. If something like this could be exposed, it would become possible to trivially convert a Finch request into a lazy enumerable:

Stream.unfold(Finch.stream(MyFinch, request), fn
  {:done, _} -> nil
  {next, conn} -> {next, Finch.stream_recv(conn)}
end)

One possible issue (and I’m sure there are more) is that I’m not sure what to do if the caller exits/crashes, which means the resource isn’t explicitly released back to NimblePool, exception telemetry can’t be sent, etc.

Yeah, I think the "there may be dragons" of several different issues is the biggest blocker and it all derives (correct me if I am wrong @sneako) from the decision that Finch was designed to avoid an additional process for HTTP 1.

Perhaps we should introduce Finch.async_stream which explicitly incurs an additional process for HTTP 1 (and none for HTTP 2), allowing us to provide streaming but without the single-process constraints we have today.

Thanks for commenting, José! I think that a Finch.async_stream that explicitly incurs an additional process may be a good compromise to make, especially in a streaming scenario. Generally speaking, you’ll use streaming APIs to avoid having to make lots of separate requests, so the overhead of the additional process would be comparatively small.

I do think it’s important that a new API doesn’t make assumptions about how the stream is consumed, i.e. returns something that can be wrapped in an enumerable if desired, or partially consumed and then only the data wrapped in an enumerable, or partially consumed and then canceled, etc.

Thanks for continuing the discussion and digging a bit deeper in to how this actually could be done!

José is correct in that we were aiming to minimize inter-process copying by avoiding additional processes. Now that Finch has become quite popular and we have Req built on top, I don't want to stand in the way of allowing Req to achieve its full potential.

I myself have begun to prefer to use Req whenever my use-case drifts even slightly from Finch's originally intended use-case, it is such a pleasant to use library.

I have to mention I do not have much time to dedicate to this work but I am committed to helping contributors flesh this out and arrive at a solution. I would love to see an initial PR ASAP.

The proposed API:

Stream.unfold(Finch.async_stream(MyFinch, request), fn
  {:done, _} -> nil
  {next, conn} -> {next, Finch.stream_recv(conn)}
end)

looks great to me, but raises a few questions.

  • What would happen if the connection is closed (or any other kind of error occurs) in between calls to stream_recv/1? Should the function actually be async_stream!/2 or return :ok/:error tuples for each part of the body? We should probably offer both variants. I imagine the ! variant will be used more frequently, but the tuple variant will make it easier to work with the incomplete response if desired.
  • RE: "The existing callback-based/blocking-loop API could be implemented on top of this." If we are still planning to introduce another process to implement this (I don't think we have come up with a way around that yet), then using recv_stream to implement the current callback based approach would really slow things down

I have to mention I do not have much time to dedicate to this work but I am committed to helping contributors flesh this out and arrive at a solution. I would love to see an initial PR ASAP.

Happy to do what I can to keep this moving forward!

The proposed API:

Stream.unfold(Finch.async_stream(MyFinch, request), fn
  {:done, _} -> nil
  {next, conn} -> {next, Finch.stream_recv(conn)}
end)

looks great to me, but raises a few questions.

  • What would happen if the connection is closed (or any other kind of error occurs) in between calls to stream_recv/1? Should the function actually be async_stream!/2 or return :ok/:error tuples for each part of the body? We should probably offer both variants. I imagine the ! variant will be used more frequently, but the tuple variant will make it easier to work with the incomplete response if desired.

I’d think that, should the connection close prematurely between calls, the next call to stream_recv/1 would return something indicating the closure or error. I’d be fine with wrapping everything in an :ok/:error tuple to make things explicit:

{:ok, {:status, pos_integer()}}
{:ok, {:headers, list()}}
{:ok, {:data, binary()}}
{:ok, :done}
{:error, :conn_closed}
{:error, Exception.t()}
# etc. 

For async_stream! to work, I think we’d have to link the process to the caller. We might be able to look to the Task API for some inspiration. I'll discuss this more below.

  • RE: "The existing callback-based/blocking-loop API could be implemented on top of this." If we are still planning to introduce another process to implement this (I don't think we have come up with a way around that yet), then using recv_stream to implement the current callback based approach would really slow things down

Agreed, I would not want to implement the current callback approach with a process-based one. I was only thinking of that while considering a single-process stream_recv. I'm not sure what changes to the core of the library will be required if we choose to incur an additional process for streaming, but whatever those may be, they should be made with care not to impact performance for existing use-cases.


If we are using another process, I wonder if an API modeled after Task.async/Task.await makes the most sense:

areq = Finch.async_request(request, MyFinch, opts)

receive do
  {^areq, {:status, status}} -> ...
  {^areq, {:headers, headers}} -> ...
  {^areq, {:data, data}} -> ...
  {^areq, :done} -> ...
end

case Finch.await_request(areq) do
  :ok -> ...
  {:error, exception} -> ...
end

As José mentioned, for HTTP2 this wouldn't require an additional process since this is effectively what is already happening. For HTTP1, it seems like this could be a fairly simple wrapper around Task, though I may be missing something.

I'd be very curious to hear from @wojtekmach as well on this!

I implemented the process-based approach in a naive way just to play around with the API a bit.

That looks nice. The only concern is that there is no back-pressure but I am not sure we can have back-pressure with the HTTP 2 version anyway. So as a next step I would recommend pushing this all the way up to the adapter, so we support HTTP2? Btw I think HTTP2 wouldn't start a task (which means we probably don't want to make this task-based? but we can figure it out as we go).

Yep, this was only meant to be a proof-of-concept to play with the API. Perhaps the HTTP/1 adapter will use a Task, but it’s redundant for HTTP/2 which is already sending essentially these exact messages.

As for back-pressure, I’m not sure it’s possible. Bytes are being sent and buffered somewhere — if not in our process queue, then somewhere in our network stack, right? (I’m honestly not 100% sure, paging @whatyouhide for network insights 😄)

At the moment I’m thinking the API should only be two functions, async_request and cancel_async_request. It will be documented to explain that it incurs an additional process per request for HTTP/1 but none for HTTP/2, and the calling process is responsible for piecing together the response. I don’t think an await is useful here since everything of interest will already have been sent.

Beautiful.

What kind of back-pressure do you guys mean? I thought at least when using Mint.HTTP2.stream/2, waiting to call that exerts back-pressure against the server.

So, I spent more time than I care to admit yesterday trying to figure out why a test wasn't working. I posted about it on the forum this morning -- if anyone here has any insight, I'd greatly appreciate it. 🙂

Writing down some thoughts while watching Cars with my son for the 20th time 😅

  • Calling async_request returns a request_ref, an opaque type containing enough info to track the request.
  • request_ref will likely be a {reference, {scheme, host, port}} tuple. SHP is necessary to find the applicable pool, ref to differentiate between requests. The ref will be created immediately, not the one returned by Mint, since we don’t have that until the request is sent.
  • Finch.Pool will add two additional callbacks, async_request and cancel_async_request.
  • HTTP/1 impl: the pool can store some additional state to map request refs to their processes, which allows us to cancel in-flight requests. I’m thinking right now that it’s not necessary to link any processes — the async request can monitor the caller and cancel itself if caller goes down, and the pool will monitor the async request to remove it from state.
  • HTTP/2 impl: the pool here wraps a gen_statem that already relies on message passing for request, but blocks in a receive loop. It’s already the calling process receiving those messages, so we’ll just need to transform them to the new {request_req, response} shape and then can use all the existing gen_statem machinery with minimal change.
    • Telemetry is one thing that might trip things up here. Right now it all happens outside of the gen_statem, which certainly makes things cleaner, but it might have to move into the gen_statem so that it can be available for async requests. This is only a sticking point for HTTP/2 — all telemetry should work as expected for HTTP/1 since it’s just the existing request callback wrapped in a process.

If anything here stands out as silly/incorrect, please let me know! Hoping to have a draft PR ready this weekend.

Looking good! I think request_ref should likely be something like {reference, HttpPoolMod, internal_state}. For http1, you need the pid of the spawned process, for http2 the pid of where the multiplexing is happening? This way you don't need to store additional state in the pool.

Looking good! I think request_ref should likely be something like {reference, HttpPoolMod, internal_state}. For http1, you need the pid of the spawned process, for http2 the pid of where the multiplexing is happening? This way you don't need to store additional state in the pool.

Great point! Something that’s unclear to me is whether that would be as resilient to errors — there are various ways the async request could fail and we want all of them to result in a {request_ref, {:error, error}} message to the caller. If we have some state in the pool, the pool can monitor the async request process and notify the caller if it exits abnormally. Without any knowledge of in-progress async requests, I think we’d have to leave error handling up to the async request process. It could wrap the underlying request in a try/catch, but it would miss exits sent from outside the process, I believe.

I would say it depends on the failure? Unexpected failures should 100% lead to crashes/exits/broken-links. The other failures we may want to bubble up and those by definition we will be taking into account in the code. But I think we will know for sure once we have the code, so I wouldn't worry about deciding it upfront. The direction is right. :)

(Posting here for the sake of anyone who might not be following #228.)

The PR is now at a good point for review, I think. Would appreciate any feedback that folks may have time to provide!

Closed via #228