oleg-st/ZstdSharp

ReadAsync doesn't return until output buffer filled or stream ends

LinusKardellInfobric opened this issue · 11 comments

I'm trying to use this library for adding compression to a network protocol, and the idea I had is to simply wrap the protocol in a ZSTD-compressed stream. Each time an end sends a message, it would flush out a compressed block. But I'm running into the problem that DecompressionStream.ReadAsync doesn't exit return the data it has already decompressed even if no more data is currently available on the inner stream. ReadAsync only exits once output.pos >= output.size, or innerStream.ReadAsync returns 0 (when the stream has ended). So there seems to be no way fetch all currently available data from a DecompressionStream when reading from a never-ending stream (unless you read one byte at a time from the DecompressionStream, but that would presumably be inefficient), and there seems to be no other way to access streaming decompression in this library without using unsafe code.

I don't see any clear, clean way of having it check if there is any more available data (there is no generic DataAvailable on Stream, and no way to use timeout or cancellation with NetworkStream), so the only solutions I can think of would be to:

  1. Have ReadAsync not await innerStream.ReadAsync if it already has some decompressed data, and instead possibly leave it in progress to be picked up by a subsequent call. Something like:

            private Task<int> readInProgress;var output = new ZSTD_outBuffer_s { pos = 0, size = (nuint)buffer.Length };
             while (output.pos < output.size)
             {
                if (input.pos >= input.size)
                {
                   int bytesRead;
                   var task = readInProgress != null ? readInProgress : innerStream.ReadAsync(inputBuffer, 0, inputBufferSize, cancellationToken);
                   int bytesRead = 0;
                   if (output.pos == 0)
                   {
                      bytesRead = await task.ConfigureAwait(false);
                      task = null;
                   }
                   else if (task.IsCompleted)
                   {
                      bytesRead = task.Result;
                      task = null;
                   }
    
                   readInProgress = task;
    
                   if (bytesRead == 0 && task == null)
                   {
                      if (checkEndOfStream && lastDecompressResult != 0)
                         throw new EndOfStreamException("Premature end of stream");
    
                      break;
                   }
    
                   input.size = (nuint)bytesRead;
                   input.pos = 0;
                }
    
                lastDecompressResult = DecompressStream(ref output, buffer.Span);
             }

    Though that's just something I quickly threw together, I haven't tested it, and Read would also need to be modified in order for mixed Sync/Async read to be safe, and I have no idea how to deal with that in Dispose.

  2. Have ReadAsync never call innerStream.ReadAsync when output.pos > 0 (which means it would call innerStream.ReadAsync at most once on each call). Would have the drawback that ReadAsync might not fill the output buffer even if there is more data available.

  3. Provide a thin, non-stream-based, safe wrapper around Decompressor.DecompressStream. Something like:

            public (nuint Status, nuint InputPos, nuint OutputPos) DecompressStream(Span<byte> inputBuffer, Span<byte> outputBuffer)
            {
                fixed (byte* inputBufferPtr = inputBuffer)
                fixed (byte* outputBufferPtr = outputBuffer)
                {
                    var input = new ZSTD_inBuffer_s { size = (nuint)inputBuffer.Length, pos = 0, src = inputBufferPtr };
                    var output = new ZSTD_outBuffer_s { size = (nuint)outputBuffer.Length, pos = 0, dst = outputBufferPtr };
                    var returnValue = DecompressStream(ref input, ref output);
                    return (returnValue, input.pos, output.pos);
                }
            }

I like the second option, we can increase the inputBuffer via bufferSize if we want to get more data in ReadAsync.

I think in that option ReadAsync would also need to start out by calling DecompressStream without reading any new input, to check if there is any data still buffered in the decompression context or in inputBuffer which didn't fit in output on the previous call (at least if the previous call ended with output.pos == output.size).

Maybe something like (again, something I just quickly scratched together without testing):

            EnsureNotDisposed();

            var output = new ZSTD_outBuffer_s { pos = 0, size = (nuint)buffer.Length };

            while (true)
            {
                // Check if we already have some data available to decompress
                lastDecompressResult = DecompressStream(ref output, buffer.Span);
                if (output.pos > 0)
                {
                    return (int)output.pos;
                }

                // Otherwise, read some more data
                if (input.pos >= input.size)
                {
                    int bytesRead;
                    if ((bytesRead = await innerStream.ReadAsync(inputBuffer, 0, inputBufferSize, cancellationToken)
                         .ConfigureAwait(false)) == 0)
                    {
                        if (checkEndOfStream && lastDecompressResult != 0)
                        {
                            throw new EndOfStreamException("Premature end of stream");
                        }

                        return 0;
                    }

                    input.size = (nuint)bytesRead;
                    input.pos = 0;
                }
            }

input.pos >= input.size checks that all input is consumed and we need to read more

True, but if I understand correctly it is still possible that the ZSTD_DCtx_s has consumed all compressed data from input, but could not fit all the decompressed data in output on the previous call.

We can add this before ReadAsync:

// frame is completely decoded so flush it before next read
if (lastDecompressResult == 0 && output.pos > 0) {
    break;
}

I'm afraid I don't see the purpose of checking if a frame has ended there.

A frame ends at the flush/close of the compression stream, so a flush in compression will cause a flush in decompression.

Flushing the compression would end the current block (ZSTD_e_flush), but not necessarily end the current frame (ZSTD_e_end). And it is still possible that when a frame ends the sender immediately starts a new frame (so the start of the new frame is received at the same time as the end of the previous frame), or that data remains buffered in the ZSTD_DCtx_s.

Should I make a pull request with my solution?

Flush before ReadAsync then?

if (output.pos > 0) {
    break;
}