API renaming: of_file
Closed this issue · 10 comments
How about renaming of_file
to something like file_lines
(which conveys its meaning more explicitly), and letting of_file
return just chunks read by input
to enable other segmentations/primary parsing?
Thanks for the suggestion!
I have thought about this. The reason Stream.of_file
is currently reading lines is because that's the most immediately useful and common use case, in my opinion. The current version of streaming does not attempt to expose multiple and flexible IO streams. I'd like to address this in the next release though.
For a more complete solution, you're right, we need to expose streams with chunked inputs. I'm hesitant to just rename of_file
to file_lines
without considering how other IO operations would be integrated. In particular, we would certainly also want to write lines, chars, chunks, etc.
Maybe something like:
Stream.File.read_lines
Stream.File.write_lines
Stream.File.read_chars
Stream.File.write_chars
- ...
would result in a better API?
I'll start making some research in this direction and will post my thoughts here.
[...] would result in a better API?
yes, definitely.
One sure I don't understand yet is why read_lines
is a stream
rather than a source
. Is it to avoid a systematic call to Stream.from
?
Stream is a convenience module, so, yes – instead of calling Stream.from
and Stream.into
directly, it provides of_
and to_
functions.
In some situations it's also possible to implement a slightly more efficient stream "source" given that actual Source.t
have to allocate an option value per element.
Now that I'm thinking about this, for the IO/File functions mentioned above, providing decoupled sources and sinks is probably a better decision. Makes them much easier to reuse in pipelines and provides nice support for leftover sources with Stream.run
.
For example, reading a header from a CSV file with:
let (header, csv_data) =
Stream.run
~from:(Source.File.lines "foo.csv")
~via:(Flow.map parse_header)
~into:Sink.first
That looks nice to me! This is a bit off-topic but I tried to define a source for gziped files:
let gziped_file ?(buffer_size = 1_024) path =
let buf = Bytes.create buffer_size in
let init () = Gzip.open_in path in
let pull ic =
match Gzip.input ic buf 0 buffer_size with
| 0 -> None
| n -> Some (String.init n (Bytes.get buf), ic)
in
let stop ic = Gzip.close_in ic in
Source.make ~init ~pull ~stop ()
Does it seem correct?
Hi @pveber! Sorry for the delay. I've been thinking recently about buffered streams. To answer your question: yes, I think currently that's the recommended way to create a gzip source. Your implementation is a well-behaved source: (1) it's lazy - the path is only open on init
; (2) it's safe - terminates the channel on stop; and (3) it buffers the input exposing a configurable buffer size.
I like these three properties, but think this could be further improved in the future versions of streaming.
In particular, it's common to have the buffer allocation local to the source like in your example, but the reality is that it's very likely that the string produced by this source (from the buffer) will be put into another buffer in a sink. This means that there will be unnecessary copying from the source buffer into the sink buffer.
Can we avoid this? I think we can if we allow the source to write data directly into a buffer provided by a sink. I'm not certain yet how exactly this interface would look like but I have been experimenting with some encodings.
My current source of inspiration is C#'s System.IO.Pipeline (https://blog.marcgravell.com/2018/07/pipe-dreams-part-1.html) and ZIO's ZTransducer type (it's similar to Streaming.Flow but works on "chunks").
I wrote some notes on lazy resource initialization for streaming: https://github.com/odis-labs/streaming/blob/7a42d248cf0ec687c34890e18d6495189c0fb196/NOTES.md#lazier-resource-initialization
Would love to know if you have any thoughts.
With my proposal, your gziped_file
source example would look like:
let gziped_file ?(buffer_size = 1_024) path =
let buf = Bytes.create buffer_size in
let init = lazy (Gzip.open_in path) in
let pull ic =
match Gzip.input (Lazy.force ic) buf 0 buffer_size with
| 0 -> None
| n -> Some (String.init n (Bytes.get buf), ic)
in
let stop ic = if Lazy.is_val ic then Gzip.close_in Lazy.force ic in
Source.make ~init ~pull ~stop ()
Are there any drawbacks with this approach?
Sorry, forgot to answer on that. Wrapping the resource in a lazy container adds a very tiny overhead when reading, but I think the benefit is worth it.
Closing this for now. I'm working on a buffered API for IO operation that will replace the existing file
functions.