ReactiveX/RxSwift

Observable from async function

DarkSatyr opened this issue · 1 comments

Short description of missing functionality:

Starting from RxSwift 6.5 there is a very helpful extension function to AsyncSequence asObservable() which helps to convert coroutine to Observable, but if I want to convert just async function to Observable I need to perform 2 steps: convert coroutine to Async[Throwing]Stream and then using asObservable() extension function convert sequence to Observable. While most of courotines are just 'singles' by nature I need to use take(1) operator after asObservable() or it won't complete at all (in case of unfolding constructor). The other option to use constructor with continuation, but I need to write a bunch of imperative-style code to produce terminative Async[Throwing]Stream. My proposal to add Observable.create factory method which accepts coroutine as an input and produce an Observable

Short code example of how you would like to use the API:

extension Observable {
 static func create(asyncFunc: @escaping () async throws -> Element) -> Observable<Element> {
   Observable.create { observer in
     let task = Task {
       do {
         let value = try await asyncFunc()
         observer.onNext(value)
         observer.onCompleted()
       } catch {
         observer.onError(error)
       }
     }
     return Disposables.create { task.cancel() }
   }
 }
}

The reason why I need this functionality:

This will simplify conversion and eliminate explicit use of Swift Streams

Code I have right now:

AsyncThrowingStream(unfolding: asyncFunc)
  .asObservable()
  .take(1)

or

 AsyncThrowingStream { continuation in
     Task {
         let result = try await provider.fetchSomeData(account: "")
         continuation.yield(result)
         continuation.finish()
     }   
    }
    .asObservable()

asyncFunc must be an @Sendable closure to enable Strict Concurrency checking