RxSwift Ⅱ:操作符和最佳实践

第五章:Filtering 操作符

Ignoring

忽略所有 event,转化为一个 Comletable。

func ignoreElements() -> Completable
Description
Skips elements and completes (or errors) when the receiver completes (or errors). Equivalent to filter that always returns false.
Returns
An observable sequence that skips all elements of the source sequence.

仅发射指定 index 的 event,其余都忽略。

func elementAt(_ index: Int) -> Observable<String>
Description
Returns a sequence emitting only element n emitted by an Observable
Parameters
index
The index of the required element (starting from 0).
Returns
An observable sequence that emits the desired element as its own sole emission.

或者直接使用 .filter ,是 RxSwift 对 Swift 标准库中 .filter 对应版本。具体用法很简单,和 Swift 版一样。

Skipping

.skip(n) 即可跳过前 n 个 event。

.skipWhile 跳过符合条件的 event,但是遇到不符合条件的 event 后,不再跳过该 event 以及后面的 event。

Observable.of(2, 2, 3, 4, 5)
.skipWhile { integer in
print("\(integer) skipWhile")
return integer % 2 == 0
}
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

Console 输出:

2 skipWhile
2 skipWhile
3 skipWhile
3
4
5

.skipUntil(trigger) 持续跳过直到另一个 Observable 生成元素。

Taking

Taking 和 Skipping 是相反的。

.take(n) 只发射前 n 个 event 。

.takeWhile {} 放射符合条件的 event,直到遇到不符合条件的元素,不再发射。

let disposeBag = DisposeBag()

Observable.of(2, 2, 4, 4, 6, 6)
.enumerated()
.takeWhile { index, integer in
integer % 2 == 0 && index < 3
}
.map { $0.element }
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

Console 输出:

2
2
4

.takeUntil(trigger) 类似的,持续发射,直到另一个 Observable 生成元素,后面不再发射。

Distinct

.distinctUntilChanged() 只发射不同于前一个的元素。

Observable.of("A", "A", "B", "B", "A")
.distinctUntilChanged()
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

Console 输出:

A
B
A

distinctUntilChanged(_:) 对阻止重复发射不符合 Equatable 类型的元素时很有用。因为你可以控制前后两个元素是否相等。

第六章:Filtering 实践

上一章介绍了 RxSwift 中函数式编程的观念。操作符操作一个 Observable,输出一个心得 Observable。得益于此,我么可以链式调用操作符。

改进 Combinestagram 项目

就是把上一章的 filtering 操作符应用在项目中。

基于时间的操作符

基于时间的操作符使用 Scheduler

take(_:scheduler:) 是一个过滤操作符,和 take(1)takeWhile(...) 类似。

只会发送指定时间段内的元素。一旦时间点过了,就会发送 .complete 事件。

private func errorMessage() {
alert(title: "No access to Camera Roll",
text: "You can grant access to Combinestagram from the Settings app")
.asObservable()
.take(5.0, scheduler: MainScheduler.instance)
.subscribe(onCompleted: { [weak self] in
self?.dismiss(animated: true, completion: nil)
_ = self?.navigationController?.popViewController(animated: true)
})
.disposed(by: bag)
}

以下是一些合适使用 throttle 的场景:

  • 搜索输入框的订阅,然后发送当前文本内容的 API 请求。通过 throttle , 你可以让用户快速输入,同时仅在用户完成输入时才你的向服务器发送请求。
  • 当用户点击按钮来展示一个 modal view controller 时,你可以阻止双击以及展示两次 modal view controller。通过 throttle 点击事件,可以现实只接收最后一次点击事件,不论用户双击或三击。
  • 如果你只关心用户拖动手势时停留的地方。你可以使用 throttle 当前触摸位置,只考虑那些停止改变的位置的元素。
images.asObservable()
.throttle(0.5, scheduler: MainScheduler.instance)
.subscribe(onNext: { [weak self] photos in
guard let preview = self?.imagePreview else { return }
preview.image = UIImage.collage(images: photos,
size: preview.frame.size)
})
.disposed(by: bag)

第七章:Transforming 运算符

当你决定学习 RxSwift,或许会觉得它是深奥的库。也许让你想起来当初学习 iOS 或者 Swift 的时候。但学习到第七章你应该意识到 RxSwift 并非魔法。它是精心构建的 API,能让极大地提升效率和简化代码。学到这里你应该会感觉不错。

本章你将学习运算符中最重要的一类:transforming 运算符。你将一直使用 transforming,准备 Observable 数据以供订阅者使用。需要强调的是,RxSwift 中的 transforming 运算符与 Swift 标准库之间有相似之处 ,如 map (_:)flatMap (_:)

Transforming 元素

将可观察到的单个元素转换为所有这些元素的数组的一种简便方法是使用 toArraytoArray

let disposeBag = DisposeBag()
Observable.of("A", "B", "C")
.toArray()
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

map 可将每个元素进行 transforming 。map

let disposeBag = DisposeBag()
let formatter = NumberFormatter()
formatter.numberStyle = .spellOut

Observable<NSNumber>.of(123, 4, 56)
.map {
formatter.string(from: $0) ?? ""
}
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6)
.enumerated()
.map { index, integer in
index > 2 ? integer * 2 : integer
}
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

Transforming 内部 Observable

RxSwift 包含 flatMap 系列中的几个运算符, 允许你深入 Observable, 处理其 Observable 类型的属性。flatMap

struct Student {
var score: BehaviorSubject<Int>
}

let disposeBag = DisposeBag()

let ryan = Student(score: BehaviorSubject(value: 80))
let charlotte = Student(score: BehaviorSubject(value: 90))

let student = PublishSubject<Student>()

student
.flatMap {
$0.score
}
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

student.onNext(ryan)
ryan.score.onNext(85)

student.onNext(charlotte)
ryan.score.onNext(95)

charlotte.score.onNext(100)

What makes

flatMapLatest different is that it will automatically switch to the latest observable and

unsubscribe from the the previous one.

flatMapLatestflatMap 略有不同之处是它会自动切换到最后一个 Observable,注销订阅之前的 Observable。

flatMapLatest

let disposeBag = DisposeBag()

let ryan = Student(score: BehaviorSubject(value: 80))
let charlotte = Student(score: BehaviorSubject(value: 90))

let student = PublishSubject<Student>()

student
.flatMapLatest {
$0.score
}
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)

student.onNext(ryan)

ryan.score.onNext(85)

student.onNext(charlotte)

ryan.score.onNext(95)

charlotte.score.onNext(100)

// Only one thing to point out here that’s different from the previous example of flatMap:
// Changing ryan’s score here will have no effect. It will not be printed out. This is because flatMapLatest has already switched to the latest observable, for charlotte.

观察事件

使用具体化运算符 materialize, 可以将 Observable 发射的每个事件包装成 Observable。比如下面的例子中 $0.score.materialize()BehaviorSubject<Int> 封装为 Observable<Event<Int>> 。这样的话,我的就可以自己处理 .error 和 .complete 事件,而不终结该 Observable 。

对应的我们再使用 .dematerialize() 转回 Observable<Int> ,即可正常订阅。

enum MyError: Error {
case anError
}

let disposeBag = DisposeBag()

let ryan = Student(score: BehaviorSubject(value: 80))
let charlotte = Student(score: BehaviorSubject(value: 100))

let student: BehaviorSubject<Student> = BehaviorSubject(value: ryan)

let studentScore: Observable<Event<Int>> = student
.flatMapLatest {
$0.score.materialize()
}

studentScore
.filter {
guard $0.error == nil else {
print($0.error!)
return false
}

return true
}
.dematerialize()
.subscribe(onNext: {
print("subscribe: \($0)")
})
.disposed(by: disposeBag)

ryan.score.onNext(85)
ryan.score.onError(MyError.anError)
ryan.score.onNext(90)
student.onNext(charlotte)

Console 输出结果:

subscribe: 80
subscribe: 85
anError
subscribe: 100

第八章:Transforming 实践

在上一章中学习了 RxSwift 的主要功能,包括:map 和 flatMap 。

开始 GitFeed 项目

本章中要处理的项目将显示 GitHub 仓库的动态, 如所有最新的 like, fork 和 comment.。

该项目将有两个不同的故事情节:

  • 主要的情节是要接触到 GitHub 的 JSON API, 接收 JSON 响应, 并最终将其转换为对象集合.
  • 次要将提取的对象保存到磁盘并在 “新” 活动列表之前的表中显示它们。从服务器获取事件。

网络获取数据

我们将会将 URLSession 添加个响应式拓展。

CocoaAPIToRxAPI

具体就是把网络 API 的请求和响应分别封装起来:

其中 share(replay:, scope:) ,可以避免 URLSession.rx.response(request:) 因过早 .complete 造成订阅时重新发送网络请求。原理就是封装多个 Observable 的元素为一个新的 Observable,并指定缓冲区最大元素数量,生命周期。定义如下:

func share(replay: Int = default, scope: SubjectLifetimeScope = default)
Summary

Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays elements in buffer.
Declaration

func share(replay: Int = default, scope: SubjectLifetimeScope = default) -> Observable<(response: HTTPURLResponse, data: Data)>

Discussion

This operator is equivalent to:
.whileConnected
// Each connection will have it's own subject instance to store replay events.
// Connections will be isolated from each another.
source.multicast(makeSubject: { Replay.create(bufferSize: replay) }).refCount()
.forever
// One subject will store replay events for all connections to source.
// Connections won't be isolated from each another.
source.multicast(Replay.create(bufferSize: replay)).refCount()
It uses optimized versions of the operators for most common operations.
Parameters

replay
Maximum element count of the replay buffer.
scope
Lifetime scope of sharing subject. For more information see SubjectLifetimeScope enum.
Returns

An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.

代码如下:

func fetchEvents(repo: String) {
// 1. 使用 map 构造一个请求
let response = Observable.from([repo])
.map { urlString -> URL in
return URL(string: "https://api.github.com/repos/\(urlString)/events")!
}
.map { [weak self] url -> URLRequest in
var request = URLRequest(url: url)
if let modifiedHeader = self?.lastModified.value {
request.addValue(modifiedHeader as String,
forHTTPHeaderField: "Last-Modified")
}
return request
}
.flatMap { request -> Observable<(response: HTTPURLResponse, data: Data)> in
return URLSession.shared.rx.response(request: request)
}
.share(replay: 1, scope: .whileConnected)

// 2. 转换响应
response
.filter { response, _ in
// 检查右边的值是否在左边的 range 内
return 200..<300 ~= response.statusCode
}
.map { _, data -> [[String: Any]] in
guard let jsonObject = try? JSONSerialization.jsonObject(with: data, options: []),
let result = jsonObject as? [[String: Any]] else {
return []
}
return result
}
.filter { objects in
return objects.count > 0
}
.map { objects in
return objects.flatMap(Event.init)
}
.subscribe(onNext: { [weak self] newEvents in
self?.processEvents(newEvents)
})
.disposed(by: bag)

response
.filter { response, _ in
return 200..<400 ~= response.statusCode
}
.flatMap { response, _ -> Observable<NSString> in
guard let value = response.allHeaderFields["Last-Modified"] as? NSString else {
return Observable.empty()
}
return Observable.just(value)
}
.subscribe(onNext: { [weak self] modifiedHeader in
guard let strongSelf = self else { return }
strongSelf.lastModified.value = modifiedHeader
try? modifiedHeader.write(to: strongSelf.modifiedFileURL, atomically: true,
encoding: String.Encoding.utf8.rawValue)
})
.disposed(by: bag)
}

第九章:合并运算符

在前面的章节中, 你学习了如何创建、filter 和转换可观察序列。RxSwift filtering 和转换运算符的行为与 Swift 的标准集合运算符非常相似。你看到了 RxSwift 与 flatMap 的真正力量, 它让你用很少的代码执行很多任务.

这一章将向你展示几种不同的组合序列的方法, 以及如何在每个序列中组合数据。你将使用的一些操作员与 Swift 集合操作员非常相似。它们有助于结合异步序列中的元素, 就像使用 Swift 数组一样。

前缀和串联

startWith(_:) 运算符赋值给定初始值的可观察序列。此值必须与可观察元素的类型相同。

let numbers = Observable.of(2, 3, 4)
let observable = numbers.startWith(1) observable.subscribe(onNext: { value in print(value) })

类似地 .concat(_:) 可以合并两个序列。

let first = Observable.of(1, 2, 3) let second = Observable.of(4, 5, 6)
let observable = Observable.concat([first, second])
observable.subscribe(onNext: { value in print(value) })

Merge

Merge 也是合并,不过会按照顺序发射事件。

merge

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let source = Observable.of(left.asObservable(), right.asObservable())

let observable = source.merge()
let disposable = observable.subscribe(onNext: { value in
print(value)
})

var leftValues = ["Berlin", "Munich", "Frankfurt"]
var rightValues = ["Madrid", "Barcelona", "Valencia"]

repeat {
if arc4random_uniform(2) == 0 {
if !leftValues.isEmpty {
left.onNext("Left:" + leftValues.removeFirst())
}
} else if !rightValues.isEmpty {
right.onNext("Right:" + rightValues.removeFirst())
}
} while !leftValues.isEmpty || !rightValues.isEmpty

disposable.dispose()

Console 输出:

Right: Madrid
Right: Barcelona
Left: Berlin
Left: Munich
Left: Frankfurt
Right: Valencia

Combining 元素

RxSwift 中一组基本的运算符是 combineLatest 。他们组合来自几个序列的值:

Combining elements

Every time one of the inner (combined) sequences emits a value, it calls a closure you provide. You receive the last value from each of the inner sequences. This has many concrete applications, such as observing several text fields at once and combining their value, watching the status of multiple sources, and so on.

每当内部(组合)序列中的一个发出一个值时,它会调用您提供的闭包。 你会收到每个内部序列的最后一个值。 这里有很多具体的应用场景,例如一次观察几个文本字段并结合它们的价值,观察多个来源的状态等等。

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let observable = Observable.combineLatest(left, right, resultSelector: {
lastLeft, lastRight in
"\(lastLeft) \(lastRight)"
})
let disposable = observable.subscribe(onNext: { value in
print(value)
})

print("> Sending a value to Left")
left.onNext("Hello,")
print("> Sending a value to Right")
right.onNext("world")
print("> Sending another value to Right")
right.onNext("RxSwift")
print("> Sending another value to Left")
left.onNext("Have a good day,")

disposable.dispose()

Console 输出:

> Sending a value to Left
> Sending a value to Right
Hello, world
> Sending another value to Right
Hello, RxSwift
> Sending another value to Left
Have a good day, RxSwift

zip 总是按照顺序一对一对的组合。类似把两个数组按照相同的下标进行组合。

zip

enum Weather {
case cloudy
case sunny
}
let left: Observable<Weather> = Observable.of(.sunny, .cloudy, .cloudy, .sunny)
let right = Observable.of("Lisbon", "Copenhagen", "London", "Madrid", "Vienna")

let observable = Observable.zip(left, right) { weather, city in
return "It's \(weather) in \(city)"
}
observable.subscribe(onNext: { value in
print(value)
})

Console 输出:

It's sunny in Lisbon
It's cloudy in Copenhagen
It's cloudy in London
It's sunny in Madrid

Triggers

应用程序有不同的需求,必须管理多个输入源。 你经常需要立即接受来自多个观察对象的输入。 有些会简单地在代码中触发行为,而其他的将提供数据。 RxSwift 已经覆盖了强大的运算符这会让你的生活更轻松。 或者,至少你的编码生活!

首先看看 withLatestFrom(_:)。 经常被初学者忽略,它在处理用户界面等方面是一个有用的配套工具。trigger

let button = PublishSubject<Void>()
let textField = PublishSubject<String>()

let observable = button.withLatestFrom(textField)
_ = observable.subscribe(onNext: { value in
print(value)
})

textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
button.onNext(())
button.onNext(())

Console 输出:

Paris
Paris

sample(_:) 和 几乎只有一个变化:每当触发器 observable 发射一个值时,sample(_:) 从其它的可观察值发出最新值,但只有在自上一次元素后才到达。 如果没有新的数据到达,sample(_:) 将不会发射任何东西。sample

Switches

Switches

amb(_:) 运算符订阅左侧和右侧的可观察序列。 它等待其中任何一个发出一个元素,然后退订另一个序列。 之后,它只传递第一个活动可观察元素。 它可以从模棱两可中选出一个:首先,你不知道你感兴趣的是哪一个序列,等其中先发射一个序列再做决定。

这个运算符经常被忽视。 它有一些精选的实际应用场景,如连接到冗余服务器,并坚持使用第一个响应的服务器。

let left = PublishSubject<String>()
let right = PublishSubject<String>()

let observable = left.amb(right)
let disposable = observable.subscribe(onNext: { value in
print(value)
})

left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")

disposable.dispose()

更流行的选项是 switchLatest() 运算符:

可以自由切换偏爱那个子序列发射的元素。switchLatest

let one = PublishSubject<String>()
let two = PublishSubject<String>()
let three = PublishSubject<String>()

let source = PublishSubject<Observable<String>>()

let observable = source.switchLatest()
let disposable = observable.subscribe(onNext: { value in
print(value)
})

source.onNext(one)
one.onNext("Some text from sequence one")
two.onNext("Some text from sequence two")

source.onNext(two)
two.onNext("More text from sequence two")
one.onNext("and also from sequence one")

source.onNext(three)
two.onNext("Why don't you seem me?")
one.onNext("I'm alone, help me")
three.onNext("Hey it's three. I win.")

source.onNext(one)
one.onNext("Nope. It's me, one!")

disposable.dispose()

Console 输出:

Some text from sequence one
More text from sequence two
Hey it's three. I win.
Nope. It's me, one!

形成可观察序列的心智模型可能很困难。 别担心, 你会习惯它。 练习是顺序理解序列的关键。 随着您的体验增长,请随时审查这些示例! 你将在下一章中更好地了解如何使用它。

Combining 一个序列中元素

所有的厨师都知道你减少得越多,酱汁就越美味。 虽然不是针对厨师,但 RxSwift 拥有将酱汁减少到最有味道的组分的工具。

reduce(_:_:) 和 Swift 标准库中类似。reduce

let source = Observable.of(1, 3, 5, 7, 9)
let observable = source.reduce(0, accumulator: { summary, newValue in
return summary + newValue
})

observable.subscribe(onNext: { value in
print(value)
})

Console 输出:

25

scan(_:accumulator:)reduce(_:_:) 只有一个最终结果不同:每次发射事件,都会计算结果并发射。scan

let source = Observable.of(1, 3, 5, 7, 9)

let observable = source.scan(0, accumulator: +)
observable.subscribe(onNext: { value in
print(value)
})

Console 输出:

1
4
9
16
25

第十章:合并运算符实践

在前一章中,你学习了将操作符合并,并通过对一些相当令人头脑灵活的概念进行越来越详细的练习。 一些运算符可能会让你对这些反应性概念的真实应用感到疑惑。
在“……实践”一章中,你将有机会尝试一些最强大的运算符。 你将学会解决类似于你在自己的应用程序中遇到的问题。

准备网络后端服务

这里使用 NASA 的 EONET 服务。

我们称之为 EONET 服务。 它抽象访问由 EONET 服务器公开的数据,将它们作为服务提供给你的应用程序。 你会看到,结合 Rx,这种模式将找到许多应用程序。 它可以让你在应用程序内清晰地分离数据生产和消耗。 你可以轻松替换或模拟生产环境,而不会对用户方面产生任何影响。

先封装一个通用网络请求,然后把 categories 封装为单例,方便所有订阅者订阅。

Categories view controller

categories view controller 显示 categories 列表。

Adding the event download service

Getting events for categories

Events view controller

Wiring the days selector

Splitting event downloads

第十一章:基于时间的运算符

时间就是一切。响应式编程背后的核心思想是基于时间异步数据流的模型。在这方面, RxSwift 提供了一系列操作, 使你能够处理时间和序列在一段时间内的响应和转换方式。正如你将在本章中看到的, 管理序列的时间维度是简单而直接的。