忽略事件
ignoreElements
忽略掉所有的.next 事件,只接受.completed 事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.ignoreElements()
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
skip(n)
忽略前 n 个信号
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.skip(2)
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
skipWhile
当遇到第一个不满足条件的事件之后,就不再忽略任何事件了
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.skipWhile { $0 == "T2" }
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
skipUntil
和 skipWhile 类似,不过忽略条件为另外一个事件序列中的事件
let tasks = PublishSubject<String>()
            let bag = DisposeBag()
            let bossIsAngry = PublishSubject<Void>()
            tasks.skipUntil(bossIsAngry)
                .subscribe { print($0) }
                .disposed(by: bag)
            tasks.onNext("T1")
            tasks.onNext("T2")
            bossIsAngry.onNext(())
            tasks.onNext("T3")
            tasks.onNext("T4")
            tasks.onCompleted()
            
            
            
            
 
distinctUntilChanged
忽略序列中连续重复的事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.distinctUntilChanged()
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
tasks.onNext("T4")
 
获取事件
elementAt(n)
取下标为 n 的元素
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.elementAt(0)
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
filter
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.filter { $0 == "T1" }
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
take(n)
除了选择订阅单一事件之外,我们也可以选择一次性订阅多个事件,例如,选择序列中的前两个事件
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.take(2) 
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
takeLast(count)
取倒数的 n 个数
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.takeLast(1)
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
takeWhile
只要条件为 true 就一直订阅下去”这样的概念
let tasks = PublishSubject<String>()
let bag = DisposeBag()
tasks.takeWhile { $0 != "T2" } 
    .subscribe { print($0) }
    .disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
 
takeUntil
发生之前一直订阅,条件为另外一个信号
let tasks = PublishSubject<String>()
let bossHasGone = PublishSubject<Void>()
let bag = DisposeBag()
tasks.takeUntil(bossHasGone).subscribe {
    print($0)
}
.disposed(by: bag)
tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
bossHasGone.onNext(())
tasks.onNext("T5")
tasks.onCompleted()
 
toArray
把 Observable<T> 中所有的事件值,在订阅的时候,打包成一个 Array返回给订阅者。只会在结束后计算.
Observable.of(1, 2, 3)
        .toArray()
        .subscribe(onSuccess: { arr in
            print(arr) // [1,2,3]
        }, onError: { error in
            print(error)
        })
        .disposed(by: bag)
    // 把原始Observable中所有的事件值变成一个数组,只要原始Observable不结束,这个转换就不会发生
    let numbers = PublishSubject<Int>()
    numbers.asObservable()
        .toArray()
        .subscribe(onSuccess: { arr in
            print("subject:\(arr)") // [1, 2, 4]
        }, onError: { error in
            print(error)
        }).disposed(by: bag)
    numbers.onNext(1)
    numbers.onNext(2)
    numbers.onNext(4)
    numbers.onCompleted()
 
scan
数据两两进行运算,如进行求和运算等。 每次有事件的时候都会执行.
 let obs = Observable<Int>.create { (ob) -> Disposable in
        ob.onNext(1)
        ob.onNext(2)
        ob.onNext(3)
        ob.onNext(4)
        ob.onNext(5)
        return Disposables.create()
    }
    obs.scan(0, accumulator: { (sum, new) -> Int in
        sum + new
    }).subscribe(onNext: { value in
        print(value) 
    }).disposed(by: bag)
 
map
self.example("map") {
    Observable.of(1, 2, 3).map {
        value in value * 2
    }.subscribe(onNext: {
        print($0)
    }).disposed(by: bag)
}
 
其他
share
 let obs = Observable<Int>.create { (ob) -> Disposable in
        ob.onNext(1)
        ob.onNext(2)
        ob.onNext(3)
        ob.onNext(4)
        ob.onNext(5)
        return Disposables.create()
    }.share()
    obs.subscribe(onNext: { code in
        print("1---\(code)")
    }).disposed(by: bag)
 
一个比较好的介绍说明: https://www.jianshu.com/p/08b30b4181ea
publish
publish 用于向所有订阅者“统一”发布事件
let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .publish()
 
加上 publish,我们发现不管从何时开始监听,统一时间监听到的信号是相同的(晚监听的不会去从 1 开始)
multicast
可以让原事件序列中的事件通过另外一个 subject 对象代为传递
let supervisor = PublishSubject<Int>()
_ = supervisor.subscribe(onNext: {
    print("Supervisor: event \($0)") })
let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .multicast(supervisor)
 
个人理解为,对原始信号进行监听。
replay(n)
订阅时,回放历史事件
buffer
可以想象的是,随意使用replayAll很容易导致问题,特别是当历史事件很多的时候,就非常容易导致资源被耗尽。为此,我们还可以为事件的回放在特定的时间范围里,指定一个最大事件数量。这个operator叫做buffer。我们直接来看代码:
let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .buffer(timeSpan: 4, count: 2, scheduler: MainScheduler.instance)
 
要注意的是,使用了buffer之后,interval就不再是connectable observable了。它有三个参数:
- timeSpan:缓冲区的时间跨度,尽管interval每隔1秒钟发生一次事件,但经过buffer处理后,就变成了最长timeSpan秒发生一次事件了,事件的值,就是由所有缓存的事件值构成的数组。如果timeSpan过后没有任何事件发生,就向事件的订阅者发送一个空数组;
 
- count:缓冲区在timeSpan时间里可以缓存的最大事件数量,当达到这个值之后,buffer就会立即把缓存的事件用一个数组发送给订阅者,并重置timeSpan;
 
- scheduler:表示Observable事件序列发生在主线程,在后面的内容里,我们还会专门介绍RxSwift中的各种scheduler;
 
flatMap / flatMapLast
struct Player {
var score: Variable<Int>
}
let John = Player(score: Variable(70))
let Jole = Player(score: Variable(90))
let players = PublishSubject<Player>()
players.asObservable()
    .flatMap { 
        $0.score.asObservable()
    }
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: self.bag)
players.onNext(John)
players.onNext(Jole)
John.score.value = 100
Jole.score.value = 101
若使用.flatMapLatest,则打印
 
把它原序列中的每个事件,都变换成一个 Observable。
因此,再加入了 Jole 之后,flatMap 一共变换出了两个 Observable, 经过 flatMap 合并过的 Observable会按发生的顺序,反映 John 和 Jole 中的所有事件。
当原序列中有新事件发生的时候,flatMapLatest 就会自动取消上一个事件的订阅,然后转换到新事件的订阅。