Rx 中的时间相关操作符

本文最后更新于 2021年4月4日 晚上

这篇文章主要是之前看 RxSwift 的第十一章时候的笔记, 方便日后再看.

通过缓冲操作符来控制过去的和新的元素在什么时候以什么样的方式交给观察者.

缓冲和延后观察

这类操作符可以保证未来的观察者可以观察到这个序列中在开始观察之前就已经发射的元素.

共有两个, 一个是 replay(_:), 一个是 replayAll.

如下是示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
let elementsPerSecond = 1
let maxElements = 5
let replayedElements = 1
let replayDelay: TimeInterval = 3

//...

// 一个定时发射元素的序列
let sourceObservable = Observable<Int>.create { observer in
var value = 1
let timer = DispatchSource.timer(
interval: 1.0 / Double(elementsPerSecond),
queue: .main) {
if value <= maxElements {
observer.onNext(value)
value = value + 1
}
}
return Disposables.create {
timer.suspend()
}
}

如果要让一个序列具有 replay 的能力, 就直接对这个序列使用 replay 操作符, 结果是生成一个新序列:

1
sourceObservable.replay(replayedElements) // 参数为缓冲区大小(单位:个)

每当有新观察者观察生成的序列时, 观察者始终都会观察到指定个数的之前元素, 然后再是接收到新元素.

对某个序列使用了该操作符, 就相当于是形成了一个和 ReplaySubject 序列形态时候的序列一样.

当在应用了 replay 形成的新序列上开始观察时, 却发现没有任何的元素输出, 甚至是 create 都没有调用, 为什么呢?

replay(_:) creates a connectable observable, you need to connect it to its underlying source to start receiving items. If you forget this, subscribers will never receive anything.

这里的 Connectable Observable 指的是一类特殊的 Observable, 无论它们有多少个观察者, 除非调用 connect, 否则它们不会发射元素(不会向 Observer 上添加任何元素). 只要是返回 ConnectableObservable 类型对象, 就需要对这些对象调用 connect 后新的序列才会发射元素:

1
sourceObservable.connect().disposed(by: _disposeBag)

replayAll() 操作符也是类似, 只不过它会将所有元素都放入缓冲区, 这样的话在使用上就需要注意了, 如果不知道原序列的元素数量, 要考虑好了再使用, 否则有可能造成形成非常大的缓冲区.

delaySubscription 可以将观察者的观察时间延后, 下面两段代码的最终效果是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 使用传统方法来延后观察:

DispatchQueue.main.asyncAfter(deadline: .now() + 2.5, execute: { [weak self] in
guard let self = self else { return }
sourceObservable
.subscribe(onNext: {
print("观察2: \($0)")
})
.disposed(by: self._disposeBag)
})


// 使用 delaySubscription 操作符延后观察:

sourceObservable
.delaySubscription(2.5, scheduler: MainScheduler.instance)
.subscribe(onNext: {
print("观察2: \($0)")
})
.disposed(by: _disposeBag)

在 RxSwift 中, 有 hot 和 cold 序列之分, 其中 cold 序列是指当有观察者观察后才会开始发射, 而 hot 是没有观察者的时候也在发射, 比如上面的 Connectable Observable, 只要调用 connect 后就在发射元素了, 而不管有没有观察者.

只有在 hot 序列上使用 delaySubscription 才会产生 “跳过” 一些元素. 如果在 cold 序列上调用, 则和直接观察没有任何区别.

时间偏移(穿越)

和穿越剧一样, 有一类操作符可以将序列上的元素按时间轴偏移.

delay 操作符就可以将元素按时间轴向后偏移, 还是利用上面的 connectable 序列来演示:

1
2
3
4
5
6
sourceObservable
.delay(5.0, scheduler: MainScheduler.instance)
.subscribe(onNext: {
print("观察: \($0)")
})
.disposed(by: _disposeBag)

上述效果就是将元素在时间轴上向后偏移, 且元素的相对位置是不变的.

Timer 类操作符

App 中经常会有定时器的需求, 无论是单次定时还是循环定时, iOS 平台上的 NSTimer 和 DispatchSource 都有一些或多或少的缺点, 故在 RxSwift 中就提供了一些既简单又高效的计时器操作符可供使用.

第一种是 interval 操作符, 用它建立一个循环定时器:

1
let obv = Observable<Int>.interval(1.0, scheduler: MainScheduler.instance)

这个序列就可以每隔一秒发射一个元素, 并且它的开始条件是有观察者观察, 即它是 cold 序列. 另外要手动停止计时也非常简单, 因为观察结果是一个 Disposable, 故直接调用 dispose 就可以停止计时了.

这里通过它就可以实现一个简单的计时器了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

private var _timerDisp: Disposable!

private var _timerObv: Observable<Int>!

// ...

_timerObv = Observable<Int>.interval(1.0, scheduler: MainScheduler.instance)

// ...

/// 启动计时器
private func startTimer() {
_timerDisp = _timerObv.subscribe(onNext: {
print("定时器信号: \($0)")
})
}

/// 停止计时器
private func stopTimer() {
_timerDisp.dispose()
_timerDisp = nil
}

// 通过一个按钮的操作就可以切换开始计时和停止计时, 很方便:

@objc private func btnClicked() {
if _timerDisp == nil {
startTimer()
} else {
stopTimer()
}
}

在实际使用上来比较的话, 这点代码就能够实现一个简单的计时器, 效率上来说就提高了很多很多!

另外一个操作符是 timer, 这个操作符相比 interval 而言功能更多, 可以设置停止时间, 也可以设置是否循环.

该操作符的第一个参数是产生第一个元素的计时时长(从开始观察触发序列发动算起), 第二个参数是循环发射, 如果不设置, 则只发射第一次信号然后完成. 第三个参数是 scheduler.

1
timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = default, scheduler: SchedulerType) -> Observable<Int>

如果这样调用: timer(1, period: 1, scheduler: MainScheduler.instance), 则就和 interval(1.0, scheduler: MainScheduler.instance) 是相同的效果.

最后一个操作符是 timeout, 它的用处估计是在网络操作的时候对序列进行超时限制, 如果超过时间的话, 这个操作符产生的新序列上就会出现超时的 error, 从而可以进行对应处理.


Rx 中的时间相关操作符
https://blog.rayy.top/2019/01/27/2019-39-Rx-time-based-operators/
作者
貘鸣
发布于
2019年1月27日
许可协议