Support IObservable<'T>
Opened this issue · 4 comments
ptrelford commented
It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.
module Stream =
let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
{ new System.IObservable<'T> with
member __.Subscribe(observer:System.IObserver<'T>) =
let (Stream streamf) = stream
let (bulk, _) = streamf (fun value -> observer.OnNext value; true)
bulk ()
observer.OnComplete()
{ new System.IDisposable with member __.Dispose() = ()}
}
let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)
palladin commented
Thanks, it will be interesting to have support for Stream.ofObservable/toObservable.
palladin commented
Semantically Stream and Observable are quite different but I think it is possible to offer support for bridge functions. I'll experiment with some impl. ideas and see how it works as a whole.
theburningmonk commented
👍
ptrelford commented
Simple implementation of fromObservable using a BlockingCollection:
let fromObservable (observable:System.IObservable<'T>) : Stream<'T> =
let xs = new System.Collections.Concurrent.BlockingCollection<'T>()
let observer =
{ new System.IObserver<'T> with
member __.OnNext(x) = xs.Add(x)
member __.OnCompleted() = xs.CompleteAdding()
member __.OnError(_) = failwith "Not implemented"
}
let disposable = observable.Subscribe(observer)
let iter iterf =
let bulk () =
let mutable next = true
while not xs.IsCompleted && next do
next <- iterf (xs.Take())
disposable.Dispose()
let next =
let flag = ref true
fun () ->
if xs.IsCompleted || not !flag then
disposable.Dispose()
false
else
flag := iterf (xs.Take())
true
(bulk, next)
Stream iter