忽略事件
ignoreElements
忽略掉所有的.next 事件,只接受.completed 事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.ignoreElements()
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
skip(n)
忽略前 n 个信号
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.skip(2)
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
skipWhile
当遇到第一个不满足条件的事件之后,就不再忽略任何事件了
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.skipWhile { $0 == "T2" }
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
skipUntil
和 skipWhile 类似,不过忽略条件为另外一个事件序列中的事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
let bossIsAngry = PublishSubject<Void>()
tasks.skipUntil(bossIsAngry)
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
bossIsAngry.onNext(())
tasks.onNext("T3")
tasks.onNext("T4")
tasks.onCompleted()
distinctUntilChanged
忽略序列中连续重复的事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.distinctUntilChanged()
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
tasks.onNext("T4")
获取事件
elementAt(n)
取下标为 n 的元素
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.elementAt(0)
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
filter
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.filter { $0 == "T1" }
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
take(n)
除了选择订阅单一事件之外,我们也可以选择一次性订阅多个事件,例如,选择序列中的前两个事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.take(2)
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
takeLast(count)
取倒数的 n 个数
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.takeLast(1)
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
takeWhile
只要条件为 true 就一直订阅下去”这样的概念
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.takeWhile { $0 != "T2" }
.subscribe { print($0) }
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
takeUntil
发生之前一直订阅,条件为另外一个信号
let tasks = PublishSubject<String>()
let bossHasGone = PublishSubject<Void>()
let bag = DisposeBag()
tasks.takeUntil(bossHasGone).subscribe {
print($0)
}
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
bossHasGone.onNext(())
tasks.onNext("T5")
tasks.onCompleted()
toArray
把 Observable<T>
中所有的事件值,在订阅的时候,打包成一个 Array返回给订阅者。只会在结束后计算.
Observable.of(1, 2, 3)
.toArray()
.subscribe(onSuccess: { arr in
print(arr) // [1,2,3]
}, onError: { error in
print(error)
})
.disposed(by: bag)
// 把原始Observable中所有的事件值变成一个数组,只要原始Observable不结束,这个转换就不会发生
let numbers = PublishSubject<Int>()
numbers.asObservable()
.toArray()
.subscribe(onSuccess: { arr in
print("subject:\(arr)") // [1, 2, 4]
}, onError: { error in
print(error)
}).disposed(by: bag)
numbers.onNext(1)
numbers.onNext(2)
numbers.onNext(4)
numbers.onCompleted()
scan
数据两两进行运算,如进行求和运算等。 每次有事件的时候都会执行.
let obs = Observable<Int>.create { (ob) -> Disposable in
ob.onNext(1)
ob.onNext(2)
ob.onNext(3)
ob.onNext(4)
ob.onNext(5)
return Disposables.create()
}
obs.scan(0, accumulator: { (sum, new) -> Int in
sum + new
}).subscribe(onNext: { value in
print(value)
}).disposed(by: bag)
map
self.example("map") {
Observable.of(1, 2, 3).map {
value in value * 2
}.subscribe(onNext: {
print($0)
}).disposed(by: bag)
}
其他
share
let obs = Observable<Int>.create { (ob) -> Disposable in
ob.onNext(1)
ob.onNext(2)
ob.onNext(3)
ob.onNext(4)
ob.onNext(5)
return Disposables.create()
}.share()
obs.subscribe(onNext: { code in
print("1---\(code)")
}).disposed(by: bag)
一个比较好的介绍说明: https://www.jianshu.com/p/08b30b4181ea
publish
publish 用于向所有订阅者“统一”发布事件
let interval = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.publish()
加上 publish,我们发现不管从何时开始监听,统一时间监听到的信号是相同的(晚监听的不会去从 1 开始)
multicast
可以让原事件序列中的事件通过另外一个 subject 对象代为传递
let supervisor = PublishSubject<Int>()
_ = supervisor.subscribe(onNext: {
print("Supervisor: event \($0)") })
let interval = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.multicast(supervisor)
个人理解为,对原始信号进行监听。
replay(n)
订阅时,回放历史事件
buffer
可以想象的是,随意使用replayAll很容易导致问题,特别是当历史事件很多的时候,就非常容易导致资源被耗尽。为此,我们还可以为事件的回放在特定的时间范围里,指定一个最大事件数量。这个operator叫做buffer。我们直接来看代码:
let interval = Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.buffer(timeSpan: 4, count: 2, scheduler: MainScheduler.instance)
要注意的是,使用了buffer之后,interval就不再是connectable observable了。它有三个参数:
- timeSpan:缓冲区的时间跨度,尽管interval每隔1秒钟发生一次事件,但经过buffer处理后,就变成了最长timeSpan秒发生一次事件了,事件的值,就是由所有缓存的事件值构成的数组。如果timeSpan过后没有任何事件发生,就向事件的订阅者发送一个空数组;
- count:缓冲区在timeSpan时间里可以缓存的最大事件数量,当达到这个值之后,buffer就会立即把缓存的事件用一个数组发送给订阅者,并重置timeSpan;
- scheduler:表示Observable事件序列发生在主线程,在后面的内容里,我们还会专门介绍RxSwift中的各种scheduler;
flatMap / flatMapLast
struct Player {
var score: Variable<Int>
}
let John = Player(score: Variable(70))
let Jole = Player(score: Variable(90))
let players = PublishSubject<Player>()
players.asObservable()
.flatMap {
$0.score.asObservable()
}
.subscribe(onNext: {
print($0)
})
.disposed(by: self.bag)
players.onNext(John)
players.onNext(Jole)
John.score.value = 100
Jole.score.value = 101
若使用.flatMapLatest,则打印
把它原序列中的每个事件,都变换成一个 Observable。
因此,再加入了 Jole 之后,flatMap 一共变换出了两个 Observable, 经过 flatMap 合并过的 Observable会按发生的顺序,反映 John 和 Jole 中的所有事件。
当原序列中有新事件发生的时候,flatMapLatest 就会自动取消上一个事件的订阅,然后转换到新事件的订阅。