RxSwift Ⅳ:RxSwift 和 RxCocoa 进阶

一旦你开始编写完整的应用程序与 RxSwift 和 RxCocoa, 你也需要照顾更多的中间主题比简单地观察事件和处理他们与 Rx。

在一个完整的生产质量应用程序中, 您需要构建一个错误处理策略, 执行更高级的多线程处理, 创建一个坚实的测试套件等等。

在这部分, 你将通过学习四个挑战性的章节, 这将解除您的 Rx 从一个菜鸟级的状态到一个实战经验的战士。

第十四章:错误处理实践

生活将是美好的, 如果我们生活在一个完美的世界, 但不幸的事情往往不像预期的那样去。即使是最好的 RxSwift 开发人员也不能避免遇到错误, 因此他们需要知道如何优雅和高效地处理它们。在本章中, 您将学习如何处理错误, 如何通过重试来管理错误恢复, 或者只是向整个宇宙投降, 让错误继续。

管理错误

常见错误类型:网络连接失败、无效的输入和 API 错误或者 HTTP 错误。

在 RxSwift 中, 错误处理是框架的一部分, 可以通过两种方式进行处理:

  • Catch: 从错误中恢复并使用默认值。
  • Retry: 重试有限 (或无限制) 次数。

本章项目的起始版本没有任何实际的错误处理。所有错误都是用一个返回虚拟版本的 catchErrorJustReturn 捕获的。这听起来像是一个方便的解决方案, 但是在 RxSwift 中有更好的处理方法。在任何一流的应用程序中, 都应该有一个一致的、信息错误处理方法。

抛出错误

一个好的起点是处理 RxCocoa 错误,它包装底层 Apple 框架返回的系统错误。

public func data(request: URLRequest) -> Observable<Data> {...}
if 200 ..< 300 ~= response.statusCode { 
return data
} else {
throw RxCocoaURLError.httpRequestFailed(response: response, data: data)
}

用 catch 处理错误

在解释如何抛出错误之后, 是时候看看如何处理错误了。最基本的方法是使用 catch。catch 的工作方式很像在 Swift 中的 do-try-catch 流程。执行可观察的操作, 如果出现错误, 则返回包装错误的事件。

在 RxSwift 中有两个主要的运算符来捕获错误。

func catchError(_ handler:) -> RxSwift.Observable<Self.E>
func catchErrorJustReturn(_ element:) -> RxSwift.Observable<Self.E>

一个常见的陷阱

链式调用时候,一个错误就会导致整个订阅以错误结束。有的时候我们想知道错误的仅有一步细节,如 HTTP 404 错误,属于 API 返回错误的具体信息的约定,就需要移除 .catchErrorJustReturn(ApiController.Weather.empty) 来避免出现不期望的错误事件发射。

捕捉错误

我们先定义一个属性来缓存天气数据。

var cache = [String: Weather]()

再把输入框的序列改为:

let textSearch = searchInput.flatMap { text in
return ApiController.shared.currentWeather(city: text ?? "Error")
.do(onNext: { data in
if let text = text {
self.cache[text] = data
}
}, onError: { [weak self] e in
guard let strongSelf = self else { return }
DispatchQueue.main.async {
strongSelf.showError(error: e)
}
})
.retryWhen(retryHandler)
.catchError { error in
if let text = text, let cachedData = self.cache[text] {
return Observable.just(cachedData)
} else {
return Observable.just(ApiController.Weather.empty)
}
}
}

错误的重试机制

关于重试上面已经设计到了 .retryWhen(retryHandler),具体处理实现如下:

let retryHandler: (Observable<Error>) -> Observable<Int> = { e in
return e.enumerated().flatMap { (attempt, error) -> Observable<Int> in
if attempt >= maxAttempts - 1 {
return Observable.error(error)
} else if let casted = error as? ApiController.ApiError, casted == .invalidKey {
return ApiController.shared.apiKey.filter {$0 != ""}.map { _ in return 1 }
}
print("== retrying after \(attempt + 1) seconds ==")
return Observable<Int>.timer(Double(attempt + 1), scheduler: MainScheduler.instance).take(1)
}
}

或者简化版:.retry(3)

自定义错误

创建自定义错误遵循一般的 Swift 原则, 所以没有一个好的 Swift 程序员不会知道, 但它仍然是好的, 看看如何处理错误和创建定制的操作符。

创建自定义错误

先定义错误枚举,然后在结果过滤中抛出定义的错误即可。

enum ApiError: Error {
case cityNotFound
case serverFailure
case invalidKey
}
return session.rx.response(request: request).map() { response, data in
if 200 ..< 300 ~= response.statusCode {
return try JSON(data: data)
} else if response.statusCode == 401 {
throw ApiError.invalidKey
} else if 400 ..< 500 ~= response.statusCode {
throw ApiError.cityNotFound
} else {
throw ApiError.serverFailure
}
}

订阅者可以根据实际场景把错误展示给用户,这部分和 Swift 中自定义错误一致,就不展开讲了。

错误处理进阶

比如处理 API 中 401 类型,需要提示用户授权失败,需要重新登录或授权。本例中需要是指 API Key 失效,需要用户填写新的有效的 API Key。

else if response.statusCode == 401 {
throw ApiError.invalidKey
}

Materialize 和 dematerialize

materialize 和 dematerialize 通常一起使用, 并且有能力完全打破原始可观察的合同。当没有其他选择来处理特定的情况时, 请小心地使用它们, 并且只有在必要的时候。

常见的应用是:进行日志记录

observableToLog.materialize()
.do(onNext: { (event) in
myAdvancedLogEvent(event)
})
.dematerialize()

第十五章:介绍调度程序

到目前为止,您已经设法使用调度程序,同时避免任何有关它们如何处理线程或并发的解释。 在前面的章节中,您使用了隐式使用某种并发 / 线程级别的方法,例如 bufferdelaySubscriptioninterval运算符。

您可能感觉调度程序有一些神奇的东西,但在您了解调度程序之前,您还需要了解那些 observeOn 函数的全部内容。

本章将介绍调度程序背后的美观,在这里您将了解为什么 Rx 抽象如此强大以及为什么使用异步编程远比使用锁或队列更省心。

调度程序到底是什么?

在你着手学习时调度程序, 了解他们是什么和他们不是什么是很重要的。总而言之, 调度程序是一个过程发生的上下文。此上下文可以是线程、调度队列或类似的实体, 甚至是在 OperationQueueScheduler 内部使用的 NSOperation。

下图是个好的例子:

schedulerExample

在此图中,具有缓存运算符的概念。 observable 向服务器发出请求并检索一些数据。此数据由名为 cache 的自定义运算符处理,该运算符将数据存储在某处。 在此之后,数据被传递给不同调度程序中的所有订阅者,很可能是位于主线程之上的 MainScheduler,使得 UI 的更新成为可能。

揭开调度程序

调度程序虽然工作方式和 GCD 类似,却不是等价的。

要记住的重要一点是, 调度程序不是线程, 并且它们没有与线程的一对一关系。始终检查计划程序执行操作的上下文, 而不是线程。在本章的后面部分, 您将遇到一些好的例子来帮助您理解这一点。

设置项目

编写两个函数打印当前调度程序所在线程。

切换调度程序

Rx 中最重要的事情之一是随时切换调度程序的能力, 没有任何限制, 除了内部进程生成事件所强加的约束之外。

fruit 是在主线程上生成的, 但是将它移动到后台线程是很好的。要在后台线程中创建 fruit , 必须使用 subscribeOn

let fruit = Observable<String>.create { observer in
observer.onNext("[apple]")
sleep(2)
observer.onNext("[pineapple]")
sleep(2)
observer.onNext("[strawberry]")
return Disposables.create()
}

fruit
.subscribeOn(globalScheduler)
.dump()
.observeOn(MainScheduler.instance)
.dumpingSubscription()
.disposed(by: bag)

观察是 Rx 的三基本概念之一。它涉及实体生成事件, 以及这些事件的观察者。在这种情况下, 在对应 subscribeOn 的情况下, 操作符 observeOn 改变了观察发生的调度程序。

这是一个非常常见的模式。您使用后台进程从服务器检索数据并处理接收的数据, 只切换到 MainScheduler 处理 final 事件并在用户界面中显示数据。

陷阱

let globalScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let bag = DisposeBag()
let animal = BehaviorSubject(value: "[dog]")


let animalsThread = Thread() {
sleep(3)
animal.onNext("[cat]")
sleep(3)
animal.onNext("[tiger]")
sleep(3)
animal.onNext("[fox]")
sleep(3)
animal.onNext("[leopard]")
}

animalsThread.name = "Animals Thread"
animalsThread.start()


animal.subscribeOn(MainScheduler.instance)
.dump()
.observeOn(globalScheduler)
.dumpingSubscription()
.disposed(by:bag)


RunLoop.main.run(until: Date(timeIntervalSinceNow: 13))

输出结果:

00s | [D] [dog] received on Main Thread
00s | [S] [dog] received on Anonymous Thread
03s | [D] [cat] received on Animals Thread
03s | [S] [cat] received on Anonymous Thread
06s | [D] [tiger] received on Animals Thread
06s | [S] [tiger] received on Anonymous Thread
09s | [D] [fox] received on Animals Thread
09s | [S] [fox] received on Anonymous Thread
12s | [D] [leopard] received on Animals Thread
12s | [S] [leopard] received on Anonymous Thread

结果出人意料,并没有如愿发生在主线程上。这是一个常见的和危险的陷阱, 它来自于在默认情况下认为 Rx 是异步的或多线程的,但这不是事实。

Rx 和一般抽象是自由线程的; 在处理数据时, 没有发生魔术般的线程切换。如果不指定其他的线程, 则始终在原始线程上执行计算。

任何线程切换都是在程序员使用运算符 subscribeOnobserveOn 的显式请求之后发生的。

认为 Rx 做一些线程处理默认会陷入一个常见的陷阱。上面发生的事情是对 Subject 的误用。原始计算在指定的线程上发生, 并且这些事件使用 Thread() { ...} 在该线程中被推入。由于 Subject 的性质, Rx 没有能力切换原始的计算调度程序, 并移动到另一个线程, 因为没有直接控制的 Subject 被推出。

但是下面的例子:

let globalScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
let bag = DisposeBag()
let animal = BehaviorSubject(value: "[dog]")

let fruit = Observable<String>.create { observer in
observer.onNext("[apple]")
sleep(2)
observer.onNext("[pineapple]")
sleep(2)
observer.onNext("[strawberry]")
return Disposables.create()
}

fruit.subscribeOn(globalScheduler)
.dump()
.observeOn(MainScheduler.instance)
.dumpingSubscription()
.disposed(by:bag)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 13))

输出:

00s | [D] [apple] received on Anonymous Thread
00s | [S] [apple] received on Main Thread
02s | [D] [pineapple] received on Anonymous Thread
02s | [S] [pineapple] received on Main Thread
04s | [D] [strawberry] received on Anonymous Thread
04s | [S] [strawberry] received on Main Thread

为什么这适用于 fruit 线程呢? 这是因为使用 Observable.create 可以让 Rx 控制 Thread 块内部发生的事情,这样你就可以更加精确地定制线程处理。
这种意想不到的结果通常被称为“冷和热”可观测问题。
在上面的例子中,你正在处理热观察。 observable 在订阅期间没有任何副作用,但它确实有自己的上下文,其中生成事件并且 RxSwift 无法控制它(即,它运行自己的 Thread)。
相反,冷观察不会在任何观察者订阅之前产生任何元素。 这实际上意味着它没有自己的上下文,直到订阅时,它创建一些上下文并开始生成元素。

热 vs 冷

上面的部分谈到了冷热观测量的话题。冷热观测量的话题是相当固执己见的, 产生了很多争论, 所以让我们简单在这里看看。这个概念可以归结为一个非常简单的问题:

HotVSCold

一些副作用的例子是:

  • 向服务器发送请求
  • 编辑本地数据库
  • 写入文件系统
  • 发射火箭

最佳实践和内置调度程序

调度程序是一个非平凡的主题, 因此它们会为最常见的用例提供一些最佳实践。在本节中, 您将获得串行和并发调度程序的快速介绍, 了解它们如何处理数据并查看哪种类型对特定上下文更有效。

串行与并发调度程序

虑到调度程序只是一个上下文, 它可以是任何东西 (调度队列、线程、自定义上下文), 并且所有转换序列的运算符都需要保留隐式保证, 因此需要确保您使用的是正确的计划程序。

  • 如果您使用的是串行调度程序, Rx 将按顺序进行计算。对于串行调度队列, 调度程序还可以在底层执行自己的优化。

  • 在并发计划程序中, Rx 将尝试同时运行代码, 但 observeOnsubscribeOn 将保留执行任务所需的顺序, 并确保订阅代码在正确的计划程序上结束。

    MainScheduler

MainScheduler 位于主线程的顶端。此计划程序用于处理用户界面上的更改并执行其他高优先级任务。作为在 iOS、tvOS 或 macOS 上开发应用程序的一般做法, 不应使用此计划程序执行长时间运行的任务, 因此应避免诸如服务器请求或其他繁重任务之类的事情。

MainScheduler 还用于在使用单位和更多特定的(如:Driver)执行所有计算。如前一章所述, Driver 确保始终在 MainScheduler 中执行计算, 以使您能够将数据直接绑定到应用程序的用户界面。

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler 负责在串行 DispatchQueue 。这个调度程序在使用 observeOn 时,有很多优化的优势。

你可以使用此计划程序来处理以串行方式更好地安排的后台作业。例如, 如果应用程序与服务器的单个路径 (如 Firebase 或 GraphQL 应用程序) 进行对话, 则可能需要避免调度多个同时请求, 这会给接收端造成太大的压力。这个调度程序是你绝对想要的, 像串行任务队列一样先进。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler 与 SerialDispatchQueueScheduler 类似,负责在 DispatchQueue 上抽象工作。 主要区别在于,调度程序使用并行队列而不是串行队列。

此类调度程序在使用 observeOn 时没有进行优化, 因此在决定使用哪种调度程序时, 请记住说明这一点。

对于需要同时结束的多个长时间运行的任务,并发的调度程序可能是一个不错的选择。 将多个可观察对象与阻塞运算符组合在一起,以便在准备好时将所有结果组合在一起,可以防止串行调度程序以最佳状态执行。 相反,并发调度程序可以执行多个并发任务并优化结果的收集。

OperationQueueScheduler

OperationQueueScheduler 类似于 ConcurrentDispatchQueueScheduler,但不是通过 DispatchQueue 抽象工作,它在 NSOperationQueue 上执行工作。 有时你需要对正在运行的并发作业进行更多控制,而对于并发 DispatchQueue 则无法做到这一点。

如果需要调整最大并发作业数,则这是作业的调度程序。 您可以定义 maxConcurrentOperationCount 来限制并发操作的数量以满足您的应用程序的需要。

TestScheduler

TestScheduler 是一种特殊的野兽。 它仅用于测试,因此尽量不要在生产代码中使用此调度程序。 这种特殊的调度程序简化了操作员测试; 它是 RxTest 库的一部分。 您将在关于测试的专用章节中了解如何使用此调度程序,但是让我们快速浏览一下,因为您正在进行调度程序的全程浏览。

第十六章:使用 RxTest 进行测试

本章将向您介绍 RxTest, 以及以后的 RxBlocking, 通过编写针对多个 RxSwift 操作的测试, 还可以编写针对生产 RxSwift 代码的测试。

本章例子是:一个将 Hex 颜色值转换为 RGB 值的应用。架构是 MVVM。

使用 RxTest 测试操作符

RxTest 是 RxSwift 外一个独立的库。它是托管在 RxSwift 仓库, 但需要一个单独的 pod 安装和导入。RxTest 为测试 RxSwift 代码提供了许多有用的补充, 如 TestScheduler, 它是一个虚拟时间计划程序, 它使你可以对测试的时间线性操作进行粒状控制, 并且包括 next(_:_:), completed(_:_:)error(_:_:) 在测试中 specified 时间启用将这些事件添加到观测量中。它还增加了热和冷的观测量, 你可以想到的热和冷三明治。当然, 不是真的三明治。

热和冷的序列是什么?

RxSwift 很长的时间来简化和简化你的 Rx 代码, 并且有热和冷的序列的区别, 当它涉及到观测量, 在 RxSwift 可以被认为是可观测的特征而不是具体的类型。

热序列:

  • 使用资源,无论是否有订阅者。
  • 生成元素,无论是否有订阅者。
  • 主要用于有状态类型,如变量。

冷序列:

  • 仅在订阅时消耗资源。
  • 仅在有订阅者时才生成元素。
  • 主要用于网络等异步操作。

下面是 amb 的测试用例:

func testAmb() {

let observer = scheduler.createObserver(String.self)

let observableA = scheduler.createHotObservable([
next(100, "a"),
next(110, "b"),
next(300, "c")
])

let observableB = scheduler.createHotObservable([
next(90, "1"),
next(200, "2"),
next(300, "3")
])

let ambObservable = observableA.amb(observableB)

scheduler.scheduleAt(0) {
self.subscription = ambObservable.subscribe(observer)
}

scheduler.start()

let results = observer.events.map {
$0.value.element!
}

XCTAssertEqual(results, ["1", "2", "3"])
}

上面是同步测试,如果想测试异步错误,最简单地是使用 RxBlocking

使用 RxBlocking

RxBlocking 是另一个库, 寄存在 RxSwift 仓库。它的主要目的是通过它的 toBlocking(timeout:) 方法将 observable 转换为 BlockingObservable

func testToArray() {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
let toArrayObservable = Observable.of(1, 2).subscribeOn(scheduler)
XCTAssertEqual(try! toArrayObservable.toBlocking().toArray(), [1, 2])
}
func testToArrayMaterialized() {
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)

let toArrayObservable = Observable.of(1, 2).subscribeOn(scheduler)

let result = toArrayObservable
.toBlocking()
.materialize()

switch result {
case .completed(elements: let elements):
XCTAssertEqual(elements, [1, 2])
case .failed(_, error: let error):
XCTFail(error.localizedDescription)
}
}

测试 RxSwift 生产代码

下面例子是测试 MVVM 中 VM 层:

使用 toBlocking 很方便编写异步测试。

import XCTest
import RxSwift
import RxCocoa
import RxTest
@testable import Testing

class TestingViewModel: XCTestCase {

var viewModel: ViewModel!
var scheduler: ConcurrentDispatchQueueScheduler!

override func setUp() {
super.setUp()

viewModel = ViewModel()
scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
}

func testColorIsRedWhenHexStringIsFF0000_async() {

let disposeBag = DisposeBag()

let expect = expectation(description: #function)

let expectedColor = UIColor(red: 1.0, green: 0.0, blue: 0.0, alpha: 1.0)

var result: UIColor!

viewModel.color.asObservable()
.skip(1)
.subscribe(onNext: {
result = $0
expect.fulfill()
})
.disposed(by: disposeBag)

viewModel.hexString.value = "#ff0000"

waitForExpectations(timeout: 1.0) { error in
guard error == nil else {
XCTFail(error!.localizedDescription)
return
}

XCTAssertEqual(expectedColor, result)
}
}

func testColorIsRedWhenHexStringIsFF0000() {

let colorObservable = viewModel.color.asObservable().subscribeOn(scheduler)

viewModel.hexString.value = "#ff0000"

do {
guard let result = try colorObservable.toBlocking(timeout: 1.0).first() else { return }

XCTAssertEqual(result, .red)
} catch {
print(error)
}
}


func testRgbIs010WhenHexStringIs00FF00() {

let rgbObservable = viewModel.rgb.asObservable().subscribeOn(scheduler)

viewModel.hexString.value = "#00ff00"

let result = try! rgbObservable.toBlocking().first()!

XCTAssertEqual(0 * 255, result.0)
XCTAssertEqual(1 * 255, result.1)
XCTAssertEqual(0 * 255, result.2)
}

func testColorNameIsRayWenderlichGreenWhenHexStringIs006636() {

let colorNameObservable = viewModel.colorName.asObservable().subscribeOn(scheduler)

viewModel.hexString.value = "#006636"

XCTAssertEqual(try! colorNameObservable.toBlocking().first()!, "rayWenderlichGreen")
}
}

第十七章:创建自定义响应式拓展

在学习了 RxSwift 和 RxCocoa,以及如何编写测试,这里我们将学习如何给 Apple 和第三方库编写 RxSwift 的拓展。本章例子是给 NSURLSession 编写拓展,来实现网络请求、缓存。但却是一个教学实例,如果生产项目可以有很多网络库可以直接使用,如 RxAlamofire、RxMoya 等。

如何创建拓展

创建一个 Cocoa 或库的拓展不是一个简单的任务。你会发现这个过程可能很棘手,你的解决方案在继续之前可能需要一些前期思考。

这里我们将学习如何拓展 URLSession,加上 Rx 的命名空间。

extension Reactive where Base: URLSession {

func response(request: URLRequest) -> Observable<(HTTPURLResponse, Data)> {
return Observable.create { observer in
// content goes here

let task = self.base.dataTask(with: request) { (data, response, error) in
guard let response = response, let data = data else {
observer.on(.error(error ?? RxURLSessionError.unknown))
return
}

guard let httpResponse = response as? HTTPURLResponse else {
observer.on(.error(RxURLSessionError.invalidResponse(response: response)))
return
}
observer.onNext((httpResponse, data))
observer.on(.completed)
}
task.resume()

return Disposables.create(with: task.cancel)
}
}

func data(request: URLRequest) -> Observable<Data> {
if let url = request.url?.absoluteString, let data = internalCache[url] {
return Observable.just(data)
}

return response(request: request).cache().map { (response, data) -> Data in
if 200 ..< 300 ~= response.statusCode {
return data
} else {
throw RxURLSessionError.requestFailed(response: response, data: data)
}
}
}

func string(request: URLRequest) -> Observable<String> {
return data(request: request).map { d in
return String(data: d, encoding: .utf8) ?? ""
}
}

func json(request: URLRequest) -> Observable<JSON> {
return data(request: request).map { d in
return try JSON(data: d)
}
}

func image(request: URLRequest) -> Observable<UIImage> {
return data(request: request).map { d in
return UIImage(data: d) ?? UIImage()
}
}

}

如何创建自定义运算符

这里我们演示如何使用运算符进行缓存网络请求结果(HTTPURLResponse、Data)的序列,简单起见使用字典。

fileprivate var internalCache = [String: Data]()

extension ObservableType where E == (HTTPURLResponse, Data) {
func cache() -> Observable<E> {
return self.do(onNext: { (response, data) in
if let url = response.url?.absoluteString, 200 ..< 300 ~= response.statusCode {
internalCache[url] = data
}
})
}
}

如何使用封装的拓展

在 cell 中更具给定 url 下载 gif 并显示出来。

func downloadAndDisplay(gif stringUrl: String) {
guard let url = URL(string: stringUrl) else { return }
let request = URLRequest(url: url)
activityIndicator.startAnimating()

let s = URLSession.shared.rx.data(request: request)
.observeOn(MainScheduler.instance)
.subscribe(onNext: { imageData in
self.gifImageView.animate(withGIFData: imageData)
self.activityIndicator.stopAnimating()
})
disposable.setDisposable(s)
}

测试封装的拓展

这里我们使用 RxNimble 来辅助我们编写测试。RxNimble 使测试更容易编写, 并有助于代码更简明。

RxBlocking 版本:

let result = try! observable.toBlocking().first()
expect(result) == 42

RxNimble 版本:

expect(observable).first == 42

完整测试代码:

import XCTest
import RxSwift
import RxBlocking
import Nimble
import RxNimble
import OHHTTPStubs
import SwiftyJSON

@testable import iGif

class iGifTests: XCTestCase {

let obj = ["array":["foo","bar"], "foo":"bar"] as [String : Any]
let request = URLRequest(url: URL(string: "http://raywenderlich.com")!)
let errorRequest = URLRequest(url: URL(string: "http://rw.com")!)

override func setUp() {
super.setUp()
// Put setup code here. This method is called before the invocation of each test method in the class.
stub(condition: isHost("raywenderlich.com")) { _ in
return OHHTTPStubsResponse(jsonObject: self.obj, statusCode: 200, headers: nil)
}
stub(condition: isHost("rw.com")) { _ in
return OHHTTPStubsResponse(error: RxURLSessionError.unknown)
}
}

override func tearDown() {
// Put teardown code here. This method is called after the invocation of each test method in the class.
super.tearDown()
OHHTTPStubs.removeAllStubs()
}

func testData() {
let observable = URLSession.shared.rx.data(request: self.request)
expect(observable.toBlocking().firstOrNil()).toNot(beNil())
}

func testString() {
let observable = URLSession.shared.rx.string(request: self.request)
let string = "{\"array\":[\"foo\",\"bar\"],\"foo\":\"bar\"}"
expect(observable.toBlocking().firstOrNil()) == string
}

func testJSON() {
let observable = URLSession.shared.rx.json(request: self.request)
let string = "{\"array\":[\"foo\",\"bar\"],\"foo\":\"bar\"}"
let json = try? JSON(data: string.data(using: .utf8)!)
expect(observable.toBlocking().firstOrNil()) == json
}

func testError() {
var erroredCorrectly = false
let observable = URLSession.shared.rx.json(request: self.errorRequest)
do {
let _ = try observable.toBlocking().first()
assertionFailure()
} catch (RxURLSessionError.unknown) {
erroredCorrectly = true
} catch {
assertionFailure()
}
expect(erroredCorrectly) == true
}
}

extension BlockingObservable {
func firstOrNil() -> E? {
do {
return try first()
} catch {
return nil
}
}
}

常用封装

RxSwift 社区非常活跃,并且已经有很多扩展和封装。 一些基于 Apple 组件,而另一些则基于许多 iOS 和 macOS 项目中广泛使用的第三方库。
你可以在 http://community.rxswift.org 找到最新的封装列表。

下面几个常用的封装:

  • RxDataSources
  • RxAlamofire
  • RxBluetoothKit

小结

关于何时需要抽象,没有真正明确的规则,但建议如果框架满足以下一个或多个条件,则应用此策略:

  • 使用带有完成和失败信息的回调
  • 使用大量委托异步返回信息
  • 需要与应用程序的其他 RxSwift 部分进行互操作