odis-labs/streaming

API renaming: of_file

Closed this issue · 10 comments

How about renaming of_file to something like file_lines (which conveys its meaning more explicitly), and letting of_file return just chunks read by input to enable other segmentations/primary parsing?

rizo commented

Thanks for the suggestion!

I have thought about this. The reason Stream.of_file is currently reading lines is because that's the most immediately useful and common use case, in my opinion. The current version of streaming does not attempt to expose multiple and flexible IO streams. I'd like to address this in the next release though.

For a more complete solution, you're right, we need to expose streams with chunked inputs. I'm hesitant to just rename of_file to file_lines without considering how other IO operations would be integrated. In particular, we would certainly also want to write lines, chars, chunks, etc.

Maybe something like:

  • Stream.File.read_lines
  • Stream.File.write_lines
  • Stream.File.read_chars
  • Stream.File.write_chars
  • ...

would result in a better API?

rizo commented

I'll start making some research in this direction and will post my thoughts here.

[...] would result in a better API?

yes, definitely.

One sure I don't understand yet is why read_lines is a stream rather than a source. Is it to avoid a systematic call to Stream.from?

rizo commented

Stream is a convenience module, so, yes – instead of calling Stream.from and Stream.into directly, it provides of_ and to_ functions.

In some situations it's also possible to implement a slightly more efficient stream "source" given that actual Source.t have to allocate an option value per element.

rizo commented

Now that I'm thinking about this, for the IO/File functions mentioned above, providing decoupled sources and sinks is probably a better decision. Makes them much easier to reuse in pipelines and provides nice support for leftover sources with Stream.run.

For example, reading a header from a CSV file with:

let (header, csv_data) =
  Stream.run
    ~from:(Source.File.lines "foo.csv")
    ~via:(Flow.map parse_header)
    ~into:Sink.first

That looks nice to me! This is a bit off-topic but I tried to define a source for gziped files:

let gziped_file ?(buffer_size = 1_024) path =
  let buf = Bytes.create buffer_size in
  let init () = Gzip.open_in path in
  let pull ic =
    match Gzip.input ic buf 0 buffer_size with
    | 0 -> None
    | n -> Some (String.init n (Bytes.get buf), ic)
  in
  let stop ic = Gzip.close_in ic in
  Source.make ~init ~pull ~stop ()

Does it seem correct?

rizo commented

Hi @pveber! Sorry for the delay. I've been thinking recently about buffered streams. To answer your question: yes, I think currently that's the recommended way to create a gzip source. Your implementation is a well-behaved source: (1) it's lazy - the path is only open on init; (2) it's safe - terminates the channel on stop; and (3) it buffers the input exposing a configurable buffer size.

I like these three properties, but think this could be further improved in the future versions of streaming.

In particular, it's common to have the buffer allocation local to the source like in your example, but the reality is that it's very likely that the string produced by this source (from the buffer) will be put into another buffer in a sink. This means that there will be unnecessary copying from the source buffer into the sink buffer.

Can we avoid this? I think we can if we allow the source to write data directly into a buffer provided by a sink. I'm not certain yet how exactly this interface would look like but I have been experimenting with some encodings.

My current source of inspiration is C#'s System.IO.Pipeline (https://blog.marcgravell.com/2018/07/pipe-dreams-part-1.html) and ZIO's ZTransducer type (it's similar to Streaming.Flow but works on "chunks").

rizo commented

I wrote some notes on lazy resource initialization for streaming: https://github.com/odis-labs/streaming/blob/7a42d248cf0ec687c34890e18d6495189c0fb196/NOTES.md#lazier-resource-initialization

Would love to know if you have any thoughts.

With my proposal, your gziped_file source example would look like:

let gziped_file ?(buffer_size = 1_024) path =
  let buf = Bytes.create buffer_size in
  let init = lazy (Gzip.open_in path) in
  let pull ic =
    match Gzip.input (Lazy.force ic) buf 0 buffer_size with
    | 0 -> None
    | n -> Some (String.init n (Bytes.get buf), ic)
  in
  let stop ic = if Lazy.is_val ic then Gzip.close_in Lazy.force ic in
  Source.make ~init ~pull ~stop ()

Are there any drawbacks with this approach?

Sorry, forgot to answer on that. Wrapping the resource in a lazy container adds a very tiny overhead when reading, but I think the benefit is worth it.

rizo commented

Closing this for now. I'm working on a buffered API for IO operation that will replace the existing file functions.