nessos/Streams

Support IObservable<'T>

Opened this issue · 4 comments

It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.

module Stream =
  let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
     { new System.IObservable<'T> with 
        member __.Subscribe(observer:System.IObserver<'T>) =
           let (Stream streamf) = stream
           let (bulk, _) = streamf (fun value -> observer.OnNext value; true) 
           bulk ()
           observer.OnComplete()
           { new System.IDisposable with member __.Dispose() = ()}
     }

let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)

Thanks, it will be interesting to have support for Stream.ofObservable/toObservable.

Semantically Stream and Observable are quite different but I think it is possible to offer support for bridge functions. I'll experiment with some impl. ideas and see how it works as a whole.

Simple implementation of fromObservable using a BlockingCollection:

let fromObservable (observable:System.IObservable<'T>) : Stream<'T> =
  let xs = new System.Collections.Concurrent.BlockingCollection<'T>()
  let observer = 
     { new System.IObserver<'T> with
        member __.OnNext(x) = xs.Add(x)
        member __.OnCompleted() = xs.CompleteAdding()
        member __.OnError(_) = failwith "Not implemented"
     }
  let disposable = observable.Subscribe(observer)
  let iter iterf =
     let bulk () =
        let mutable next = true
        while not xs.IsCompleted && next do
           next <- iterf (xs.Take())
        disposable.Dispose()
     let next =
        let flag = ref true
        fun () ->
           if xs.IsCompleted || not !flag then 
              disposable.Dispose()
              false
           else 
              flag := iterf (xs.Take())
              true
     (bulk, next)
  Stream iter