RxSwift 中的 Scheduler 笔记

本文最后更新于 2021年4月4日 晚上

这篇是看 RxSwift 中的 Scheduler 的一些笔记, 方便日后查看.

Scheduler

首先要明确 Scheduler 到底是什么?

a scheduler is a context where a process takes place. This context can be a thread, a dispatch queue or similar entities, or even an NSOperation which is used inside the OperationQueueScheduler.

Scheduler 是一个过程执行所处的上下文环境, 这个上下文可以是线程, DispatchQueue, NSOperation(OperationQueueScheduler) 等.

意味着 Scheduler 是一种对 iOS 多线程编程 API 的高级抽象, 而非仅仅建立在对线程的抽象上, 所以不要认为 Scheduler 仅仅是对线程的抽象.

注意 Scheduler 并非线程, 且它和线程也并非一对一的关系.

MainScheduler 就是主线程上的 Scheduler.

使用操作符切换 Scheduler

切换 Scheduler 的意思就是切换某任务的执行环境(执行上下文).

subscribeOn 用于切换观察者的创建块的执行上下文, 即 create 块的执行上下文.

下面这段代码中的 create 块执行时会阻塞执行线程:

1
2
3
4
5
6
7
8
let fruit = Observable<String>.create { observer in
observer.onNext("[apple]")
sleep(2)
observer.onNext("[pineapple]")
sleep(2)
observer.onNext("[strawberry]")
return Disposables.create()
}

如果对这个序列进行观察, 则会造成主线程阻塞(如果 subscribe 操作符是在主线程调用的话).

1
2
3
4
fruit
.dump()
.dumpingSubscription()
.disposed(by: _disposeBag)

其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension ObservableType {
func dump() -> RxSwift.Observable<Self.E> {
return self.do(onNext: { element in
let threadName = getThreadName()
print("\(secondsElapsed())s | [副作用(do)] \(element) received on \(threadName)")
})
}

func dumpingSubscription() -> Disposable {
return self.subscribe(onNext: { element in
let threadName = getThreadName()
print("\(secondsElapsed())s | [观察(subscribe)] \(element) received on \(threadName)")
})
}
}

如果把序列生成代码的执行上下文改变的话, 就可以避免这样的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
let globalScheduler = ConcurrentDispatchQueueScheduler(queue: .global())

fruit
.subscribeOn(globalScheduler)
.dump()
.dumpingSubscription()
.disposed(by: _disposeBag)

// 上面的代码和如下代码效果一致:

let fruit = Observable<String>.create { observer in
DispatchQueue.global().async {
observer.onNext("[apple]")
sleep(2)
observer.onNext("[pineapple]")
sleep(2)
observer.onNext("[strawberry]")
}
return Disposables.create()
}

fruit
.dump()
.dumpingSubscription()
.disposed(by: _disposeBag)

通过 observeOn(MainScheduler.instance) 可以将观察块的执行上下文进行切换.

一些预定义的 Scheduler

如下是 Rx 预定义的一些 Scheduler:

  • MainScheduler: 主线程上的 Scheduler

  • SerialDispatchQueueScheduler: 基于串行 Dispatch 队列的 Scheduler

  • ConcurrentDispatchQueueScheduler: 基于并行 Dispatch 队列的 Scheduler

  • OperationQueueScheduler: 基于 OperationQueue 的 Scheduler, 可以进行更精细控制, 比如并行任务数等内容.


RxSwift 中的 Scheduler 笔记
https://blog.rayy.top/2019/01/27/2019-41-rxswift-scheduler/
作者
貘鸣
发布于
2019年1月27日
许可协议