Observable from async function
DarkSatyr opened this issue · 1 comments
Short description of missing functionality:
Starting from RxSwift 6.5 there is a very helpful extension function to AsyncSequence asObservable() which helps to convert coroutine to Observable, but if I want to convert just async function to Observable I need to perform 2 steps: convert coroutine to Async[Throwing]Stream and then using asObservable() extension function convert sequence to Observable. While most of courotines are just 'singles' by nature I need to use take(1) operator after asObservable() or it won't complete at all (in case of unfolding constructor). The other option to use constructor with continuation, but I need to write a bunch of imperative-style code to produce terminative Async[Throwing]Stream. My proposal to add Observable.create factory method which accepts coroutine as an input and produce an Observable
Short code example of how you would like to use the API:
extension Observable {
static func create(asyncFunc: @escaping () async throws -> Element) -> Observable<Element> {
Observable.create { observer in
let task = Task {
do {
let value = try await asyncFunc()
observer.onNext(value)
observer.onCompleted()
} catch {
observer.onError(error)
}
}
return Disposables.create { task.cancel() }
}
}
}
The reason why I need this functionality:
This will simplify conversion and eliminate explicit use of Swift Streams
Code I have right now:
AsyncThrowingStream(unfolding: asyncFunc)
.asObservable()
.take(1)
or
AsyncThrowingStream { continuation in
Task {
let result = try await provider.fetchSomeData(account: "")
continuation.yield(result)
continuation.finish()
}
}
.asObservable()
asyncFunc
must be an @Sendable
closure to enable Strict Concurrency checking