/SakontStack.ReactiveStream

a Stream wrapper that provides read / write progress reporting through IProgress<StreamProgress> and an IObservable<StreamProgress>

Primary LanguageC#MIT LicenseMIT

Reactive Stream

a Stream wrapper that provides Read/Write progress reporting through IProgress<StreamProgress> and IObservable<StreamProgress> with speed throttling for Read/Write streams separately.

Version Downloads
Latest version Downloads

Example Usage:

Report file download progress:

  • progress using IProgress<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var progress = new Progress<ReactiveStream.StreamProgress>(p => Console
                                                              .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}" +
                                                                         $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()} " +
                                                                         $"({p.Percentage:N2}%) " +
                                                                         $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"));

var stream = new ReactiveStream(new MemoryStream(), progress: progress,  totalLength: length);
// stream progress will only start reporting progress after you subscribe to it
stream.Subscribe();
await response.Content.CopyToAsync(stream);

NOTE: setting custom report interval while using IProgress<StreamProgress> alone, requires using stream.Sample() operator.

  • progress using IObservable<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var stream = new ReactiveStream(new MemoryStream(), totalLength: length);
stream
  .Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
  .Do(p => Console
            .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
                       $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
                       $"({p.Percentage:N2}%) "+
                       $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
  .Subscribe();
await response.Content.CopyToAsync(stream);
  • setting download speed limits:
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var stream = new ReactiveStream(new MemoryStream(), totalLength: length,
                                configureStream: s =>
                                                 {
                                                     // 1 MB/sec speed limit for write streams;
                                                     s.WriteSpeedLimit = 1024 * 1024;
                                                 });
stream
  .Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
  .Do(p => Console
            .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
                       $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
                       $"({p.Percentage:N2}%) "+
                       $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
  .Subscribe();
await response.Content.CopyToAsync(stream);
  • changing speed limits in realtime:
stream.ModifyOptions(x => x.WriteSpeedLimit = null);

ByteSizeLib library is not a dependency, and only used in this example to provide sample codes that will print progress with better formatted byte sizes

example output:

...
Downloaded 4.7 MiB/4.69 GiB (0.10%) 680 KiB/sec
Downloaded 5.03 MiB/4.69 GiB (0.10%) 1016 KiB/sec
Downloaded 5.04 MiB/4.69 GiB (0.10%) 1 MiB/sec
Downloaded 5.36 MiB/4.69 GiB (0.11%) 328 KiB/sec
Downloaded 5.7 MiB/4.69 GiB (0.12%) 680 KiB/sec

API:

the ReactiveStream class implements both Stream & IObservable<ReactiveStream.StreamProgress>

public class ReactiveStream : Stream, IObservable<StreamProgress>
{
    //* ... *//
}