let textSearch = searchInput.flatMap { text in returnApiController.shared.currentWeather(city: text ??"Error") .do(onNext: { data in iflet text = text { self.cache[text] = data } }, onError: { [weakself] e in guardlet strongSelf =selfelse { return } DispatchQueue.main.async { strongSelf.showError(error: e) } }) .retryWhen(retryHandler) .catchError { error in iflet text = text, let cachedData =self.cache[text] { returnObservable.just(cachedData) } else { returnObservable.just(ApiController.Weather.empty) } } }
错误的重试机制
关于重试上面已经设计到了 .retryWhen(retryHandler),具体处理实现如下:
1 2 3 4 5 6 7 8 9 10 11
let retryHandler: (Observable<Error>) -> Observable<Int> = { e in return e.enumerated().flatMap { (attempt, error) -> Observable<Int> in if attempt >= maxAttempts -1 { returnObservable.error(error) } elseiflet casted = error as?ApiController.ApiError, casted == .invalidKey { returnApiController.shared.apiKey.filter {$0!=""}.map { _inreturn1 } } print("== retrying after \(attempt +1) seconds ==") returnObservable<Int>.timer(Double(attempt +1), scheduler: MainScheduler.instance).take(1) } }
或者简化版:.retry(3)
自定义错误
创建自定义错误遵循一般的 Swift 原则, 所以没有一个好的 Swift 程序员不会知道, 但它仍然是好的, 看看如何处理错误和创建定制的操作符。
创建自定义错误
先定义错误枚举,然后在结果过滤中抛出定义的错误即可。
1 2 3 4 5
enumApiError: Error { case cityNotFound case serverFailure case invalidKey }
fruit 是在主线程上生成的, 但是将它移动到后台线程是很好的。要在后台线程中创建 fruit , 必须使用 subscribeOn。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
let fruit =Observable<String>.create { observer in observer.onNext("[apple]") sleep(2) observer.onNext("[pineapple]") sleep(2) observer.onNext("[strawberry]") returnDisposables.create() }
fruit .subscribeOn(globalScheduler) .dump() .observeOn(MainScheduler.instance) .dumpingSubscription() .disposed(by: bag)
00s | [D] [dog] received on Main Thread 00s | [S] [dog] received on Anonymous Thread 03s | [D] [cat] received on Animals Thread 03s | [S] [cat] received on Anonymous Thread 06s | [D] [tiger] received on Animals Thread 06s | [S] [tiger] received on Anonymous Thread 09s | [D] [fox] received on Animals Thread 09s | [S] [fox] received on Anonymous Thread 12s | [D] [leopard] received on Animals Thread 12s | [S] [leopard] received on Anonymous Thread
let globalScheduler =ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global()) let bag =DisposeBag() let animal =BehaviorSubject(value: "[dog]")
let fruit =Observable<String>.create { observer in observer.onNext("[apple]") sleep(2) observer.onNext("[pineapple]") sleep(2) observer.onNext("[strawberry]") returnDisposables.create() }
00s | [D] [apple] received on Anonymous Thread 00s | [S] [apple] received on Main Thread 02s | [D] [pineapple] received on Anonymous Thread 02s | [S] [pineapple] received on Main Thread 04s | [D] [strawberry] received on Anonymous Thread 04s | [S] [strawberry] received on Main Thread
functestToArray() { let scheduler =ConcurrentDispatchQueueScheduler(qos: .default) let toArrayObservable =Observable.of(1, 2).subscribeOn(scheduler) XCTAssertEqual(try! toArrayObservable.toBlocking().toArray(), [1, 2]) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
functestToArrayMaterialized() { let scheduler =ConcurrentDispatchQueueScheduler(qos: .default)
let toArrayObservable =Observable.of(1, 2).subscribeOn(scheduler)
let result = toArrayObservable .toBlocking() .materialize()
switch result { case .completed(elements: let elements): XCTAssertEqual(elements, [1, 2]) case .failed(_, error: let error): XCTFail(error.localizedDescription) } }
funcresponse(request: URLRequest) -> Observable<(HTTPURLResponse, Data)> { returnObservable.create { observer in // content goes here
let task =self.base.dataTask(with: request) { (data, response, error) in guardlet response = response, let data = data else { observer.on(.error(error ??RxURLSessionError.unknown)) return }
let s =URLSession.shared.rx.data(request: request) .observeOn(MainScheduler.instance) .subscribe(onNext: { imageData in self.gifImageView.animate(withGIFData: imageData) self.activityIndicator.stopAnimating() }) disposable.setDisposable(s) }
classiGifTests: XCTestCase { let obj = ["array":["foo","bar"], "foo":"bar"] as [String : Any] let request =URLRequest(url: URL(string: "http://raywenderlich.com")!) let errorRequest =URLRequest(url: URL(string: "http://rw.com")!) overridefuncsetUp() { super.setUp() // Put setup code here. This method is called before the invocation of each test method in the class. stub(condition: isHost("raywenderlich.com")) { _in returnOHHTTPStubsResponse(jsonObject: self.obj, statusCode: 200, headers: nil) } stub(condition: isHost("rw.com")) { _in returnOHHTTPStubsResponse(error: RxURLSessionError.unknown) } } overridefunctearDown() { // Put teardown code here. This method is called after the invocation of each test method in the class. super.tearDown() OHHTTPStubs.removeAllStubs() } functestData() { let observable =URLSession.shared.rx.data(request: self.request) expect(observable.toBlocking().firstOrNil()).toNot(beNil()) }
functestString() { let observable =URLSession.shared.rx.string(request: self.request) let string ="{\"array\":[\"foo\",\"bar\"],\"foo\":\"bar\"}" expect(observable.toBlocking().firstOrNil()) == string }
functestJSON() { let observable =URLSession.shared.rx.json(request: self.request) let string ="{\"array\":[\"foo\",\"bar\"],\"foo\":\"bar\"}" let json =try?JSON(data: string.data(using: .utf8)!) expect(observable.toBlocking().firstOrNil()) == json }
functestError() { var erroredCorrectly =false let observable =URLSession.shared.rx.json(request: self.errorRequest) do { let_=try observable.toBlocking().first() assertionFailure() } catch (RxURLSessionError.unknown) { erroredCorrectly =true } catch { assertionFailure() } expect(erroredCorrectly) ==true } }