Observable 合并
startWith
需要注意的是 startWith 中的事件值的类型,和它后续的事件值类型,必须是相同的
enum Condition: String {
case cellular = "Cellular"
case wifi
case none
}
let bag = DisposeBag()
let request = Observable<String>.create { (ob) -> Disposable in
ob.onNext("Reponse from server.")
ob.onCompleted()
return Disposables.create()
}
request.startWith(Condition.wifi.rawValue)
.subscribe(onNext: { dump($0) })
.disposed(by: bag)
concat
把两个并行的 Observable 合并起来串行处理。可以合并多个 observer ,假设合并了[A、B]
- A 执行完后才会对 B 进行监听
- A 和 B 都执行完才会执行 subscribe 的 Completed、Disposed
- Error 的情况
- B 如果在等待状态(还没有监听),不会触发 subscribe 的 onError
- 在监听状态会触发 onError -> onDisposed
enum E: Error {
case demo
}
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
let sequence = Observable.concat([queueA.asObserver(), queueB.asObserver()])
sequence.subscribe(onNext: { msg in
dump(msg)
}, onError: {
print($0)
}, onCompleted: {
print("Completed")
}, onDisposed: {
print("Disposed")
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A11")
queueB.onNext("B1")
queueA.onNext("A2")
queueA.onNext("A21")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onNext("B3")
queueB.onCompleted()
}
## merge
- 只能合并相同类型的信号,多个信号并行。
- 可以指定最大并行的数量
- 并行中信号任意一个执行 onError,会触发 subscribe 的 error->completed
self.example("merge 并行") {
enum E: Error {
case demo
}
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
let sequence = Observable.of(queueA.asObserver(), queueB.asObserver()).merge()
sequence.subscribe(onNext: { msg in
dump(msg)
}, onError: {
print($0)
}, onCompleted: {
print("Completed")
}, onDisposed: {
print("Disposed")
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A2")
queueB.onNext("B1")
queueA.onNext("A3")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onCompleted()
Observables 中的事件合并
combineLatest
对最新的事件进行合并,可以合并不同类型的事件
enum E: Error {
case demo
}
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.combineLatest(queueA, queueB, resultSelector: { (a, b) -> String in
a + "," + b
}).subscribe(onNext: { msg in
dump(msg)
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
如果中间有完成的,则将会记录它最后一个是事件。与后续事件进行合并
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onNext("B3")
如果发生 error 则立即结束
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onNext("A2")
queueA.onError(E.demo)
queueB.onNext("B2")
queueB.onNext("B3")
zip
顾名思义,拉链。合并并消费
enum E: Error {
case demo
}
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.zip(queueA, queueB, resultSelector: { (a, b) -> String in
a + "," + b
}).subscribe(onNext: { msg in
dump(msg)
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()