If you're using RxSwift, you may have encountered situations where the built-in operators do not bring the exact functionality you want. The RxSwift core is being intentionally kept as compact as possible to avoid bloat. This repository's purpose is to provide additional convenience operators.
RxSwiftExt now requires Swift 3 and RxSwift 3.0.0 or later. If your project is running on Swift 2.x, please use version 1.2
of the library.
Using Swift 3:
pod "RxSwiftExt"
If you use Swift 2.x:
pod "RxSwiftExt", '1.2'
Add this to your Cartfile
github "RxSwiftCommunity/RxSwiftExt"
RxSwiftExt is all about adding operators to RxSwift! Currently available operators:
- unwrap
- ignore
- ignoreWhen
- Observable.once
- distinct
- mapTo
- not
- Observable.cascade
- retry
- repeatWithBehavior
- catchErrorJustComplete
- pausable
Unwrap optionals and filter out nil values.
Observable.of(1,2,nil,Int?(4))
.unwrap()
.subscribe { print($0) }
Next(1)
Next(2)
Next(4)
Ignore specific elements.
Observable.from(["One","Two","Three"])
.ignore("Two")
.subscribe { print($0) }
Next(One)
Next(Three)
Completed
Ignore elements according to closure.
Observable<Int>
.of(1,2,3,4,5,6)
.ignoreWhen { $0 > 2 && $0 < 6 }
.subscribe { print($0) }
Next(1)
Next(2)
Next(6)
Completed
Send a next element exactly once to the first subscriber that takes it. Further subscribers get an empty sequence.
let obs = Observable.once("Hello world")
print("First")
obs.subscribe { print($0) }
print("Second")
obs.subscribe { print($0) }
First
Next(Hello world)
Completed
Second
Completed
Pass elements through only if they were never seen before in the sequence.
Observable.of("a","b","a","c","b","a","d")
.distinct()
.subscribe { print($0) }
Next(a)
Next(b)
Next(c)
Next(d)
Completed
Replace every element with the provided value.
Observable.of(1,2,3)
.mapTo("Nope.")
.subscribe { print($0) }
Next(Nope.)
Next(Nope.)
Next(Nope.)
Completed
Negate booleans.
Observable.just(false)
.not()
.subscribe { print($0) }
Next(true)
Completed
Sequentially cascade through a list of observable, dropping previous subscriptions as soon as an observable further down the list starts emitting elements.
let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
.subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
Next(a:1)
Next(a:2)
Next(b:1)
Next(c:1)
Next(c:2)
Repeats the source observable sequence using given behavior in case of an error or until it successfully terminated.
There are four behaviors with various predicate and delay options: immediate
, delayed
, exponentialDelayed
and
customTimerDelayed
.
// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
}, onError: { error in
print("Receive error: \(error)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError
Repeats the source observable sequence using given behavior when it completes. This operator takes the same parameters as the retry operator.
There are four behaviors with various predicate and delay options: immediate
, delayed
, exponentialDelayed
and customTimerDelayed
.
// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
.subscribe(onNext: { event in
print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Completes a sequence when an error occurs, dismissing the error condition
let _ = sampleObservable
.do(onError: { print("Source observable emitted error \($0), ignoring it") })
.catchErrorJustComplete()
.subscribe {
print ("\($0)")
}
Next(First)
Next(Second)
Source observable emitted error fatalError, ignoring it
Completed
Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true
.
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()
let pausedObservable = observable.pausable(pauser)
let _ = pausedObservable
.subscribe { print($0) }
Next(2)
Next(3)
More examples are available in the project's Playground.
This library belongs to RxSwiftCommunity.
RxSwiftExt is available under the MIT license. See the LICENSE file for more info.