implement streaming decode?
vn971 opened this issue · 14 comments
Would it be possible to implement a streaming decoder for zstd?
An example interface can be seen here:
https://github.com/alexcrichton/xz2-rs/blob/3400ef960644db934ec5aa845c6e520257a7fb6f/src/read.rs#L111
Given something that has a Read interface r: R
, it creates a decoder that itself can be read impl<R: Read>
. Pretty similar to a pipe in unix: | gunzip | ...
The pure-rust library libflate implements the same thing for gzip: https://github.com/sile/libflate/blob/master/src/gzip.rs#L903
I would be happy to use this interface for an ArchLinux tool that I'm developing (rua).
I think this would be equivalent to using FrameDecoder::new() and then FrameDecoder::init()
(see here:
Line 81 in 20b32ff
I chose this two step process so that the FrameDecoder struct can be reused so allocations only have to be made once, and not for each new file/stream you want to decode.
For how that would look like you could take a look at the src/bin/zstd.rs file. It shows an example of how to use the streaming API.
If i missed something that libflate's interface does what my crate doesn't I would be glad to add it. In general streaming zstd decoding is no problem (it was kinda designed with that use case in mind I think)
Hmm, interesting. I wrote some code for that method, it compiles, the "Result" of reset
is Ok
. But the contents of the underlying .tar.zst archive is said to be empty. If this .tar.zst archive is unpacked using unzstd
, the compressed tar archive is readable and non-empty.
Just in case, here's the code that I used: https://github.com/vn971/rua/pull/94/files#diff-0148a000aa2614c1a3d5382e420cad58R44
*thinking ...
I think I'll need to create a "minimal viable example" to understand if there's buggy coding on my side or smth else.
I just had a look at your code. Maybe I should make the usage of this crate more clear.
You need:
- A decoder
- reset/init the decoder with a Reader
- call the decode_blocks() or decode_from_to() functions
You are missing step 3) right now
I'd recommend to look at the src/bin/zstd.rs file. It showcases how I imagined someone might use this library. Note that you can pass BlockDecodingStrategy::All to decode_blocks to just decode the whole stream instead of the BlockDecodingStrategy::UptoBytes(batch_size) which limits how many bytes will be decoded at once.
@KillingSpark I would argue that this is not really a "streaming read", to be honest. Streaming generally means that nothing gets evaluated before you ask for it. Also, the interface of Read
does not have a concept of calling external functions to ensure availability of data, so if something is truly Read
, it can supply the data when asked for.
On practical terms, if I create the decoded reader in e.g. libflate
or xz2
, it is available immediately, nothing big gets decoded in-memory instantly. When I start reading the stream, however, it gets evaluated. Is this kind of thing technically possible - or desired - for zstd?
BTW, unix pipes work similarly in that regard. If you execute unxz big_archive.pkg.tar.xz | head --lines=10
, it will only read the amount of data to fill the whole pipe buffer with data. (A pipe buffer is by default around 65k.) When this limit is reached, a write
attempt by the source will block. But that is off-topic of course - just mentioning because it might be interesting.
What do you think of such a "streaming"/Read
interface overall?
Ah I see. That should be easily doable with a wrapper around the existing FrameDecoder. I'll add this to this crate when I get around to it. (I'd be happy to accept a pull request too if you implement it before I do)
Something along the lines of the following should do the trick:
(I just threw this together in github so it likely wont compile)
struct StreamingDecoder{
decoder: ruzstd::frame_decoder::FrameDecoder,
reader: &dyn std::io::Read,
}
impl Read for StreamingDecoder {
fn read(&mut self, data: &u[8]) -> Result<usize> {
if self.decoder.can_collect() == 0 && self.decoder.is_finished() {
// return an EOF if the frame is finished and all remaining bytes have been read
}
if self.decoder.can_collect() >= data.len() {
// happy, enough bytes are already buffered
self.decoder.read(data)
}else{
// need to decode more bytes if possible
self.decoder.decode_blocks(self.reader, BlockDecodingStrategy::UptoBytes(data.len())?;
self.decoder.read(data)
}
}
}
So I did this here. I hope this does what you expected from the FrameDecoder :)
Thanks a lot! It works. Will you make a release?
Unrelated: I noticed that it takes around 56 seconds on my computer to decompress a 630Mb (tar) archive. The command-line zstd
takes around 580 ms to do the job. If packed as xz
, it takes around 15 seconds to decompress (last part is not relevant to zstd, just thinking aloud).
The current StreamingDecoder can probably be optimized, though it probably needs careful thinking and analysis to figure out where/how exactly.
It's good-enough for my current use case though, for now, I think.
Yep will make a new release and push it to crates.io. Just wanted to wait and see if this is what you wanted :)
As for the performance: This seems weird. Can you test this with the FrameDecoder way to confirm it has to do with the StreamingDecoder wrapper? I did some (very basic, just on a few files) benchmarking and "only" experienced a 3x slowdown compared to zstd nothing close to 100x
Did you use the --release flag to run your test? Using this lib in debug mode is pretty slow.
@KillingSpark oh I feel very stupid, indeed I tested with debug
🤦♂️.
I didn't notice the error immediately because I was testing xz
as well, and failed to understand that it binds to native and thus shouldn't have a perf difference.
Anyway, compiled with --release
it's pretty good. Comparable to the 3x mentioned in the README and by you here now.
No worries, happy you get the same speeds as I do. 3x is still not very good but I think it's an acceptable price for getting rid of a C dependency
Thanks again!