RxSwift

写在前面:

  • 前面的相对简单,看看example和说明即可,后面的相对复杂会有阅读笔记记录理解,测试的过程和应用场景的思考。
  • RxSwift 为我们提供了 50 多个操作符,我建议你到 rxmarbles.com 或者到 App Store 下载 RxMarbles App,并在 App 中替换各种参数来观察执行的结果
  • 想要用一个固定序列模拟延迟的发送,参考如下代码:
Observable.from([10, 20, 30]).delaywhen( x => timer(x))

Concepts

Every Observable instance is just a sequence.

The key advantage for an Observable sequence vs. Swift's Sequence is that it can also receive elements asynchronously. This is the essence of RxSwift. Everything else expands upon this concept.

  • An Observable (ObservableType) is equivalent to a Sequence.
  • The ObservableType.subscribe(_:) method is equivalent to Sequence.makeIterator().
  • ObservableType.subscribe(_:) takes an observer (ObserverType) parameter, which will be subscribed to automatically receive sequence events and elements emitted by the Observable, instead of manually calling next() on the returned generator.

Observables will not execute their subscription closure unless there is a subscriber.

  _ = Observable<String>.create { observerOfString in
            print("Observable created")
            observerOfString.on(.next("😉"))
            observerOfString.on(.completed)
            return Disposables.create()
        }
        .subscribe { event in
            print(event)
    }

Creating and Subscribing to Observables

There are several ways to create and subscribe to Observable sequences.

never

Creates a sequence that never terminates and never emits any events. More info

let disposeBag = DisposeBag()
let neverSequence = Observable<String>.never()

let neverSequenceSubscription = neverSequence
    .subscribe { _ in
        print("This will never be printed")
}

neverSequenceSubscription.disposed(by: disposeBag)

empty

Creates an empty Observable sequence that only emits a Completed event. More info

通常用来发一个成功信号。

let disposeBag = DisposeBag()

Observable<Int>.empty()
    .subscribe { event in
        print(event)
    }
    .disposed(by: disposeBag)

This example also introduces chaining together creating and subscribing to an Observable sequence.

just

Creates an Observable sequence with a single element. More info

let disposeBag = DisposeBag()

Observable.just("🔴")
    .subscribe { event in
        print(event)
    }
    .disposed(by: disposeBag)

of

Creates an Observable sequence with a fixed number of elements.

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
    .subscribe(onNext: { element in
        print(element)
    })
    .disposed(by: disposeBag)
  • This example also introduces using the subscribe(onNext:) convenience method. Unlike subscribe(_:), which subscribes an event handler for all event types (Next, Error, and Completed), subscribe(onNext:) subscribes an element handler that will ignore Error and Completed events and only produce Next event elements.
  • There are also subscribe(onError:) and subscribe(onCompleted:) convenience methods, should you only want to subscribe to those event types.
  • 分类监听的api同时还把elementevent里解包了出来。
  • And there is a subscribe(onNext:onError:onCompleted:onDisposed:) method, which allows you to react to one or more event types and when the subscription is terminated for any reason, or disposed, in a single call:
 someObservable.subscribe(
     onNext: { print("Element:", $0) },
     onError: { print("Error:", $0) },
     onCompleted: { print("Completed") },
     onDisposed: { print("Disposed") }
 )

from

Creates an Observable sequence from a Sequence, such as an Array, Dictionary, or Set.

let disposeBag = DisposeBag()

Observable.from(["🐶", "🐱", "🐭", "🐹"])
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

This example also demonstrates using the default argument name $0 instead of explicitly naming the argument.

offrom的区别:

  • of是不定长参数,如果传入了一个数组,意思是只传了一个(数组)元素

  • from则可以相当于解包了再of

    create

    Creates a custom Observable sequence. More info

let disposeBag = DisposeBag()

let myJust = { (element: String) -> Observable<String> in
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}
    
myJust("🔴")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

range

Creates an Observable sequence that emits a range of sequential integers and then terminates. More info

let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

repeatElement

Creates an Observable sequence that emits the given element indefinitely. More info

let disposeBag = DisposeBag()

Observable.repeatElement("🔴")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

This example also introduces using the take operator to return a specified number of elements from the start of a sequence.

generate

Creates an Observable sequence that generates values for as long as the provided condition evaluates to true.

let disposeBag = DisposeBag()

Observable.generate(
        initialState: 0,
        condition: { $0 < 3 },
        iterate: { $0 + 1 }
    )
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

rx版的for循环

deferred

Creates a new Observable sequence for each subscriber. More info

let disposeBag = DisposeBag()
var count = 1

let deferredSequence = Observable<String>.deferred {
    print("Creating \(count)")
    count += 1
    
    return Observable.create { observer in
        print("Emitting...")
        observer.onNext("🐶")
        observer.onNext("🐱")
        observer.onNext("🐵")
        return Disposables.create()
    }
}

deferredSequence
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

deferredSequence
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出:

Creating 1
Emitting...
🐶
🐱
🐵
Creating 2
Emitting...
🐶
🐱
🐵

作用是什么?目前看到的貌似是为每一个说订阅者都新建了一个序列?因为count累加了代表代码块进了数次,每次进入都有一个create方法应该被执行了。

error

Creates an Observable sequence that emits no items and immediately terminates with an error.

let disposeBag = DisposeBag()
    
Observable<Int>.error(TestError.test)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

empty作用差不多,一个发结束信号,一个发错误信号

doOn

Invokes a side-effect action for each emitted event and returns (passes through) the original event. More info

let disposeBag = DisposeBag()

Observable.of("🍎", "🍐", "🍊", "🍋")
    .do(onNext: { print("Intercepted:", $0) }, afterNext: { print("Intercepted after:", $0) }, onError: { print("Intercepted error:", $0) }, afterError: { print("Intercepted after error:", $0) }, onCompleted: { print("Completed")  }, afterCompleted: { print("After completed")  })
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出:

Intercepted: 🍎
🍎
Intercepted after: 🍎
Intercepted: 🍐
🍐
Intercepted after: 🍐
Intercepted: 🍊
🍊
Intercepted after: 🍊
Intercepted: 🍋
🍋
Intercepted after: 🍋
Completed
After completed

可见要hook的话就在这里了

There are also doOnNext(_:), doOnError(_:), and doOnCompleted(_:) convenience methods to intercept those specific events, and doOn(onNext:onError:onCompleted:) to intercept one or more events in a single call.

Working with Subjects

  • A Subject is a sort of bridge or proxy that is available in some implementations of Rx that acts as both an observer and Observable. (先接再发)
  • Because it is an observer, it can subscribe to one or more Observables,
  • and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
  • More info
extension ObservableType {
    
    /**
     Add observer with `id` and print each emitted event.
     - parameter id: an identifier for the subscription.
     */
    func addObserver(_ id: String) -> Disposable {
        subscribe { print("Subscription:", id, "Event:", $0) }
    }
    
}

func writeSequenceToConsole<Source: ObservableType>(name: String, sequence: Source) -> Disposable {
    return sequence.subscribe { event in
        print("Subscription: \(name), event: \(event)")
    }
}

PublishSubject

Broadcasts new events to all observers as of their time of the subscription.

    let disposeBag = DisposeBag()
    let subject = PublishSubject<String>()
    
    subject.addObserver("1").disposed(by: disposeBag)
    subject.onNext("🐶")
    subject.onNext("🐱")
    
    subject.addObserver("2").disposed(by: disposeBag)
    subject.onNext("🅰️")
    subject.onNext("🅱️")
}

输出:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

即只广播订阅后的内容。上例中从“A”开始,多了个订阅者,所以每个订阅者都能接到next事件了

This example also introduces using the onNext(_:) convenience method, equivalent to on(.next(_:), which causes a new Next event to be emitted to subscribers with the provided element. There are also onError(_:) and onCompleted() convenience methods, equivalent to on(.error(_:)) and on(.completed), respectively.

ReplaySubject

Broadcasts new events to all subscribers, and the specified bufferSize number of previous events to new subscribers.

let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

ReplaySubject可以设置回溯的长度,因此这里在广播了A的时候增加了订阅者,订阅前的🐱也是能广播出去的:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

BehaviorSubject

Broadcasts new events to all subscribers, and the most recent (or initial) value to new subscribers.

let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject.addObserver("1").disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")

subject.addObserver("3").disposed(by: disposeBag)
subject.onNext("🍐")
subject.onNext("🍊")

behaviorSubject只能向上回溯一个事件,也因此你需要提供一个初始值,以备回溯不到的时候用来填充。也就是说,基本上等于replay设置为1,但是replay在回溯不到的时候不需要初始值。

Notice what's missing in these previous examples? A Completed event. PublishSubject, ReplaySubject, and BehaviorSubject do not automatically emit Completed events when they are about to be disposed of.

Combination Operators

Operators that combine multiple source Observables into a single Observable.

startWith

Emits the specified sequence of elements before beginning to emit the elements from the source Observable. More info

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
    .startWith("1️⃣")
    .startWith("2️⃣")
    .startWith("3️⃣", "🅰️", "🅱️")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
  • startWith就是添加序列(源)

  • 输出的顺序是【3️⃣🅰️🅱️ 2️⃣ 1️⃣ 🐶🐱🐭🐹】,也就是不同的源是后出先出(LIFO)的原则,但是同一个源内是先入先出(FIFO)的,仔细观察一下,我特地分了组

    merge

    Combines elements from source Observable sequences into a single new Observable sequence, and will emit each element as it is emitted by each source Observable sequence. More info

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")

这里需要观察语法,它能merge的前提它初始化序列用的是两个序列而不是两个元素。

思考题:仅仅只把两个序列丢进构造器而不是merge会怎样?

答案
RxSwift.PublishSubject
RxSwift.PublishSubject
仅仅是输出了两个subject。等于你就是订阅了外层事件,`merge`后相当于展开了订阅

那么问题2,普通元素初始化的序列,你去merge会产生什么样的输出?

Observable.of("1","2","3","4")
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
答案
expression failed to parse:
error: MyPlayground.playground:21:12: error: static member 'merge' cannot be used on instance of type 'Observable<_>'
Observable.of("1","2","3","4")
~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~
Observable<_>
可见是不行的。

zip

Combines up to 8 source Observable sequences into a single new Observable sequence, and will emit from the combined Observable sequence the elements from each of the source Observable sequences at the corresponding index. More info

这个图片已经描述得很清楚了,并且没有pair上的话,就不会广播了(短板理论)

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")

intSubject.onNext(1)
intSubject.onNext(2)

stringSubject.onNext("🆎")
intSubject.onNext(3)

注意一下具体语法与merge的区别。

  • merge是用序列来创建序列后的一个实例方法
  • zip是个类方法(或理解为等同于of, from之类的构造器)
  • 它是把每个序列按各自的广播顺序取同索引的值,然后再组织成一个回调方法的N个参数(你用$0, $1...取也行)
  • 所以能广播几次,由最短的那个序列决定
  • 看上例,不管序列1广播了几次,直到序列2广播出第一个,产生一次广播 输出如下:
🅰️ 1
🅱️ 2
🆎 3

combineLatest

Combines up to 8 source Observable sequences into a single new Observable sequence, and will begin emitting from the combined Observable sequence the latest elements of each source Observable sequence once all source sequences have emitted at least one element, and also when any of the source Observable sequences emits a new element. More info

这个看图也描述清楚了,zip在你有广播,但别的序列没广播的时候,就死等,而combineLatest却直接把别的序列里最新的一条拿过来,形成一次广播,不管这条是新的还是已经广播过一次(或N次)的f

这个用来广播比赛成绩非常适合,不管哪个队的比分有变化了,都把所有选手的实时比分广播出来。

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
        "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")
intSubject.onNext(1)
intSubject.onNext(2)

stringSubject.onNext("🆎")

能猜到输出是这样的:

🅱️ 1
🅱️ 2
🆎 2

当前所有序列里的最后一个。

注意,上例Observable.combineLatest的返回类型是Observable<String>,取决于closure里面是怎么返回的,但是在playground里的输出(以及跟踪源码),显示是CombineLatest2<String, Int, String>

There is also a variant of combineLatest that takes an Array (or any other collection of Observable sequences):

let disposeBag = DisposeBag()

let stringObservable = Observable.just("❤️")
let fruitObservable = Observable.from(["🍎", "🍐", "🍊"])
let animalObservable = Observable.of("🐶", "🐱", "🐭", "🐹")

Observable.combineLatest([stringObservable, fruitObservable, animalObservable]) {
        "\($0[0]) \($0[1]) \($0[2])"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

这种直接用array来初始化的序列与你手动广播的区别就在它没有等待时间,不存在什么我广播的时候,另一个序列还没广播(即还没有发送next事件),所以它其实是按索引来取的。如果索引越界,则取最后一个。

Because the combineLatest variant that takes a collection passes an array of values to the selector function, it requires that all source Observable sequences are of the same type.(即这种方式要求序列元素类型相同,而采用不定长参数的方式则不需要,因为closuer里有8个重载方法,对应最多8个序列里的元素,每个重载的参数个数从2到8,自然可以有8个不同的类型)

验证下输出:

❤️ 🍎 🐶
❤️ 🍐 🐶
❤️ 🍐 🐱
❤️ 🍊 🐱
❤️ 🍊 🐭
❤️ 🍊 🐹

switchLatest

Transforms the elements emitted by an Observable sequence into Observable sequences, and emits elements from the most recent inner Observable sequence. More info

这里涉及了inner subject图片完全没看明白,还是看代码:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")

let subjectsSubject = BehaviorSubject(value: subject1)
    
subjectsSubject.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🏈")
subject1.onNext("🏀")

subjectsSubject.onNext(subject2)

subject1.onNext("⚾️")

subject2.onNext("🍐")

先看输出:

⚽️
🏈
🏀
----这里广播了新序列2,序列1里的广播就不再监听了
🍎
🍐
  1. 所以如果有听监听最后一个序列的需求,就switchLatest一下。
  2. switchLatest的,广播出去的元素必须conform to 'ObservableConvertibleType'
  3. 前面几个是类方法,这个是实例方法,注意。

withLatestFrom

Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any.

let disposeBag = DisposeBag()

let foodSubject = PublishSubject<String>()
let drinksSubject = PublishSubject<String>()

foodSubject.asObservable()
    .withLatestFrom(drinksSubject) { "\($0) + \($1)" }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

foodSubject.onNext("🥗")

drinksSubject.onNext("☕️")
foodSubject.onNext("🥐")

drinksSubject.onNext("🍷")
foodSubject.onNext("🍔")

foodSubject.onNext("🍟")

drinksSubject.onNext("🍾")

输出(没看懂):

🥐 + ☕️
🍔 + 🍷
🍟 + 🍷

In this example 🥗 is not printed because drinksSubject did not emit any values before 🥗 was received. The last drink (🍾) will be printed whenever foodSubject will emit another event. -> 事实上drink (🍾)也没有打印(版本问题?)

强行解释:

  1. 序列1广播的时候,序列2没有元素,忽略
  2. 序列2广播的时候,序列1没有元素(忽略过的就无效了),但序列2不忽略
  3. 序列1广播的时候,序列2有元素,就组成一次广播
  4. 重复2和3,又组成了一次广播
  5. 序列1再次广播,序列2没有新元素,于是就使用了上一次的元素,组成一次广播
  6. 序列2再广播,序列1没有新元素,不广播

可见,withLatestFrom的意思是我本身只消费新元素,没有匹配就作废,但是那个latestFrom的对象序列是永远取最后一个就行,哪怕你广播了100遍,它只广播了一遍,那一个元素也会到这100次广播里。

验证,我在最后一行后加上:

drinksSubject.onNext("🍾")  // 这里上例的最后一行
foodSubject.onNext("1")
foodSubject.onNext("2")
foodSubject.onNext("3")
foodSubject.onNext("4")
foodSubject.onNext("5")

按我上面的推论,序列1永远用最新的,序列2会一直用最后一个🍾,而不会产生丢弃,实测是正确的:

1 + 🍾
2 + 🍾
3 + 🍾
4 + 🍾
5 + 🍾

它的使用场景好像是:

  • 如果你关联的序列没有广播过,那么整个序列将不会产生任何广播
  • 一旦关联序列开始广播,那么将以你的广播为主,拼上关联序列的最新一个广播(不管这个元素被关联过多少次)。

比如一个关联的状态机,还没有状态的时候不做任何业务(即不广播已经产生的业务数据),一旦广播过一次状态,那么以后所有的业务都把当前“最新”的状态附加到业务数据里去,供调用者来使用。(只是根据实测,状态机有数据前的业务数据就全部丢弃了,它只适用于会重复刷新数据的场景)

Transforming Operators

Operators that transform Next event elements emitted by an Observable sequence.

map

Applies a transforming closure to elements emitted by an Observable sequence, and returns a new Observable sequence of the transformed elements. More info

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { $0 * $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

compactMap

compactMap常用于过滤掉值为nil的操作符,你可以把 compactMap 理解为同时使用 filter 和 map 的两个操作符。filter 把nil的值过滤掉,而 map 把非空的值进行转换。

Observable.of("1", "not-a-number", "2")
    .compactMap { Int($0) }
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

flatMap and skill

  • Transforms the elements emitted by an Observable sequence into Observable sequences,

  • and merges the emissions from both Observable sequences into a single Observable sequence.

    This is also useful when, for example,

  • when you have an Observable sequence that itself emits Observable sequences,

  • and you want to be able to react to new emissions from either Observable sequence.

  • The difference between flatMap and flatMapLatest is, flatMapLatest will only emit elements from the most recent inner Observable sequence. More info

let disposeBag = DisposeBag()

struct Player {
    init(score: Int) {
        self.score = BehaviorSubject(value: score)
    }

    let score: BehaviorSubject<Int>
}

let 👦🏻 = Player(score: 80)
let 👧🏼 = Player(score: 90)

let player = BehaviorSubject(value: 👦🏻)

player.asObservable()
    .flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

👦🏻.score.onNext(85)

player.onNext(👧🏼)

👦🏻.score.onNext(95) // Will be printed when using flatMap, but will not be printed when using flatMapLatest

👧🏼.score.onNext(100)

这个例子有内外两层序列

  1. 队员序列,有新队员就会广播出来
  2. 队员的分数序列,显然分数是会变化的,因此也构造成一个序列
  3. 在监听队员序列的时候,目的其实是监听每个队员的分数变化,因此flatMap一下
  4. 下面会说flatMapLatest其实是mapswitchLatest的组合

归根结底,flatmap的区别在不flat的话,只会监听表层(上例则只会产生两个添加了player的广播),如果你是想监听每个对象里的序列,则需要flat一下。

In this example, using flatMap may have unintended consequences. After assigning 👧🏼 to player.value, 👧🏼.score will begin to emit elements, but the previous inner Observable sequence (👦🏻.score) will also still emit elements. By changing flatMap to flatMapLatest, only the most recent inner Observable sequence (👧🏼.score) will emit elements, i.e., setting 👦🏻.score.value to 95 has no effect.

flatMapLatest is actually a combination of the map and switchLatest operators. (自己尝试改写下?)

同理,flatMapFirst则只会监听第一个。

三种场景下的输出:(flatMap, latest, first),其中括号内数值就是latest序列里的:

80, 85, (90), 95, (100)
80, 85, (90), (100)
80, 85, 95

插一个练习,我想把运动员名字和得分一起输出,因此加了一个name字段。 因为得分是一个序列,于是我考虑combine两个序列(名字序列只更新一次,所以用了combineLatest:

struct Player {
    init(score: Int, name: String) {
        self.score = BehaviorSubject(value: score)
        self.name = BehaviorSubject(value: name)
        self.combine = Observable.combineLatest(self.name, self.score) {
            "\($0) \($1)"
        }
    }

    let score: BehaviorSubject<Int>
    let name: BehaviorSubject<String>
    let combine: Observable<String>    
}

这样监听的时候监听combine字段就行了。

后来觉得这样是不是太重了,我的真实需求应该是map :

self.combine = self.score.asObservable().map { "\(name): \($0)" }

再插一个例子,上例是一个动态的subject,现在来一个全写死的简单例子:

struct TemperatureSensor {
  let temperature: Observable<Int>
}
let sensor1 = TemperatureSensor(temperature: Observable.of(21, 23))
let sensor2 = TemperatureSensor(temperature: Observable.of(22, 25))
Observable.of(sensor1, sensor2)
    .flatMap { $0.temperature } // 监听里层的序列
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

输出:21, 23, 22, 25,就是说一条序列完了再广播另一个。

scan

Begins with an initial seed value, and then applies an accumulator closure to each element emitted by an Observable sequence, and returns each intermediate result as a single-element Observable sequence. More info

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .scan(1) { aggregateValue, newValue in
        aggregateValue + newValue
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

// 简写:.scan(1) { $0 + $1} , 跟reduce函数API差不多

注意与后面要介绍的recude的区别

Filtering and Conditional Operators

Operators that selectively emit elements from a source Observable sequence.

filter

Emits only those elements from an Observable sequence that meet the specified condition. More info

let disposeBag = DisposeBag()

Observable.of(
    "🐱", "🐰", "🐶",
    "🐸", "🐱", "🐰",
    "🐹", "🐸", "🐱")
    .filter {
        $0 == "🐱"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

distinctUntilChanged

Suppresses sequential duplicate elements emitted by an Observable sequence. More info

看图,并不是想象中的distinct,而是只跟上一条广播相比,如果没有变化就不发送,但是历史上有一样的数据不管。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

连续发了三个🐱,显然只会打印一个。

elementAt

Emits only the element at the specified index of all elements emitted by an Observable sequence. More info

    let disposeBag = DisposeBag()
    
    Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
        .element(at: 3)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

single

  • Emits only the first element (or the first element that meets a condition) emitted by an Observable sequence.
  • Will throw an error if the Observable sequence does not emit exactly one element.
    • 所以不带条件的single和匹配出了多个元素的条件,都会报错
// example("single") 
let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)


// example("single with conditions")
let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single { $0 == "🐸" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

Observable.of("🐱", "🐰", "🐶", "🐱", "🐰", "🐶")
    .single { $0 == "🐰" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single { $0 == "🔵" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

输出:

不带条件,报错:
🐱
Unhandled error happened: Sequence contains more than one element.
 subscription called from:

刚好一个,能正常结束:
next(🐸)
completed

多个匹配,报错
next(🐰)
error(Sequence contains more than one element.)

无匹配,报错
error(Sequence doesn't contain any elements.)

仔细观察,不带条件的直接single,监听到的已经是元素而不是事件了?

take

Emits only the specified number of elements from the beginning of an Observable sequence. More info

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

takeLast

Emits only the specified number of elements from the end of an Observable sequence. More info

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .takeLast(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

takeWhile

Emits elements from the beginning of an Observable sequence as long as the specified condition evaluates to true. More info

    let disposeBag = DisposeBag()
    
    Observable.of(1, 2, 3, 4, 5, 6)
        .take(while: { $0 < 4 })
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

注意,takeWhile在碰到第一个false时就退出了,它不能当conditions来使用。

takeUntil

Emits elements from a source Observable sequence until a reference Observable sequence emits an element. More info

    let disposeBag = DisposeBag()
    
    let sourceSequence = PublishSubject<String>()
    let referenceSequence = PublishSubject<String>()
    
    sourceSequence
        .take(until: referenceSequence)
        .subscribe { print($0) }
        .disposed(by: disposeBag)
    
    sourceSequence.onNext("🐱")
    sourceSequence.onNext("🐰")
    sourceSequence.onNext("🐶")
    
    referenceSequence.onNext("🔴")
    
    sourceSequence.onNext("🐸")
    sourceSequence.onNext("🐷")
    sourceSequence.onNext("🐵")

即一旦另一个序列有了新消息,本序列的监听就结束了。

skip

Suppresses emitting the specified number of elements from the beginning of an Observable sequence. More info

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

skipWhile

Suppresses emitting the elements from the beginning of an Observable sequence that meet the specified condition. More info

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 5, 6)
    .skip(while: { $0 < 4 })
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

比较takeWhileskipWhile

  • take是从头开始取,一旦条件不满足,就不取了
    • 即使后面还有满足的,也不会取了
    • 如果第一个就不满足,那就不会发送任何消息
  • skip是从头开始“过滤”,一理条件不满足,就会通通广播出来
    • 即使后面还有满足过滤条件的,也不会skip
    • 如果第一个就不需要过滤,那整个序列都不会过滤(全部广播出来)
  • 所以take就是从头取到不满足条件为止,skip就是从满足条件的位置开始取到尾巴。

例:

// 意思是从第一个小于13的元素开始广播(即全序列)
// 你的可以认为第一个条件就失败了,所以是全序列
Observable.of(11, 12, 13, 14, 15, 16)
    .skipWhile { $0 >= 13}

// 意思是从头开始取,直到小于13(即空序列)
// 或者理解为,第一个元素就是false,那么就不返了(即空序列)
Observable.of(11, 12, 13, 14, 15, 16)
    .takeWhile { $0 >= 13 }

skipWhileWithIndex

Suppresses emitting the elements from the beginning of an Observable sequence that meet the specified condition, and emits the remaining elements. The closure is also passed each element's index.

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .enumerated()
    .skipWhile { $0.index < 3 }
    .map { $0.element }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

首先,不要当成条件过滤器,它仍然是从“头”过滤。其次,我写demo的RxSwift版本比较老,上例需要改成:

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
//    .enumerated()
//    .skipWhile { $0.index < 3 }
//    .map { $0.element }
    .skipWhileWithIndex { $1 < 3 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
  1. 需要enumerated一下,即新版Observable对象其实已经没有这个方法了

  2. skipWhile一个是enumerated对象的方法,一个是可观察对象的方法,注意区别

    • 所以方法名一致,但参数不一致(一个index在0,一个在1)
  3. enumerated其实用indexelement包装了一下,所以要map出来

    skipUntil

    Suppresses emitting the elements from a source Observable sequence until a reference Observable sequence emits an element. More info

    即一直忽略序列里的信号,直到关联序列有了值,才开始广播。(至于图中的是个双向开关,第一次是打开,第二次是关闭,没演示出来)

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

// 按图示,我以为会继续输出11,但22,33不会输出
// 在5.0.0版的RxSwift上测试结果是全部输出了
sourceSequence.onNext("11")
referenceSequence.onNext("🔴")
sourceSequence.onNext("22")
sourceSequence.onNext("33")

除了filter,其它方法都只能按顺序判断,碰到条件不符合就中止了,filter则是会每个元素进行判断。

Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable.

toArray

Converts an Observable sequence into an array, emits that array as a new single-element Observable sequence, and then terminates. More info

let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
    .toArray()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

输出:

success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

注意,发送的已经不是next, complete等event了,直接就是success

reduce

Begins with an initial seed value, and then applies an accumulator closure to all elements emitted by an Observable sequence, and returns the aggregate result as a single-element Observable sequence. More info

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .reduce(1, accumulator: +)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

首先,那个accumulator,你可以写完整形式

{ aggregateValue, newValue in
    aggregateValue + newValue
}

或简写成

{ $0 + $1 }

因为仅仅就是一个相加,所以最简单的还是一个+符号

打印输出,就能看出与scan的区别在于,scan是把聚合的过程都广播出来了,而reduce只会广播聚合后的结果。

concat

Joins elements from inner Observable sequences of an Observable sequence in a sequential manner, waiting for each sequence to terminate successfully before emitting elements from the next sequence. More info

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let subjectsSubject = BehaviorSubject(value: subject1)

subjectsSubject.asObservable()
    .concat()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

subjectsSubject.onNext(subject2)

subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject2.onNext("🐱2")
subject2.onNext("🐱3")

subject1.onCompleted()

subject2.onNext("🐭")

输出:

next(🍎)
next(🍐)
next(🍊)
next(🐱3) // 前3条全部被忽略了(其实是前4条,因为初始值有个🐶)
next(🐭)
  1. 与图示不一样,拼接的序列在第个序列complete之前广播的元素都会被忽略
    • 除了最后一个
  2. 从example的写法可以看出,concat方法貌似是打开了一个能力,参考withLatestFrom
foodSubject.asObservable()
    .withLatestFrom(drinksSubject) { "\($0) + \($1)" }

也是给序列调过某个方法之后,后续给这个序列喂一个序列(作为新元素)时,这个父序列才知道怎么处理,比如这里是处理为concat,即拼接。

解释上述第2条,concat方法单独调用,其实只是用在subject上的用法而已,普通序列的话则可以这样用:

Observable.of(1, 2)
    .concat(Observable.of(3, 4))
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

会正常输出1,2,3,4,是不是有点像startWith了?

所以subject这种可以中途订阅别人的序列,都有这种用法,下同。注意体会一下两种用法的差别。

Connectable Operators

Connectable Observable sequences resembles ordinary Observable sequences, except that they not begin emitting elements when subscribed to, but instead, only when their connect() method is called. In this way, you can wait for all intended subscribers to subscribe to a connectable Observable sequence before it begins emitting elements.

Within each example on this page is a commented-out method. Uncomment that method to run the example, and then comment it out again to stop running the example.

Before learning about connectable operators, let's take a look at an example of a non-connectable operator:

// 本章会大量用到的delay方法
func delay(_ delay:Double, closure:@escaping ()->()) {
    DispatchQueue.main.asyncAfter(deadline: .now()+delay, execute: closure)
}

func sampleWithoutConnectableOperators() {
    printExampleHeader(#function)
    
    let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    
    _ = interval
        .subscribe(onNext: { print("Subscription: 1, Event: \($0)") })
    
    delay(5) {
        _ = interval
            .subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
    }
}

//sampleWithoutConnectableOperators() // ⚠️ Uncomment to run this example; comment to stop running

输出:

Subscription: 1, Event: 0
Subscription: 1, Event: 1
Subscription: 1, Event: 2
Subscription: 1, Event: 3
Subscription: 1, Event: 4
Subscription: 1, Event: 5
Subscription: 2, Event: 0
Subscription: 1, Event: 6
Subscription: 2, Event: 1
Subscription: 1, Event: 7
Subscription: 2, Event: 2
Subscription: 1, Event: 8
Subscription: 2, Event: 3
...

interval creates an Observable sequence that emits elements after each period, on the specified scheduler. (发送的值就是次数) More info

注意,第二个订阅者在5秒后订阅,但拿到的值是从0开始的。那么这个值来自哪呢?

  1. 来自序列,只不过是把历史的第一个挨个返出来(这样的话,你点了十个按钮后第二个监听者加入,他得到的是你点的第一个按钮?)

  2. 来自这个订阅机制,自动按接消息顺序给你编个号(不太可能)

    publish

    Converts the source Observable sequence into a connectable sequence. More info

func sampleWithPublish() {
    printExampleHeader(#function)
    
    let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        .publish()
    
    _ = intSequence
        .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    delay(2) { _ = intSequence.connect() }
    
    delay(4) {
        _ = intSequence
            .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
    }
    
    delay(6) {
        _ = intSequence
            .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
    }
}

//sampleWithPublish() // ⚠️ Uncomment to run this example; comment to stop running

输出:

Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
...
  1. publish才能connect
    • publishconnect不会发送任何消息
    • 相比不带操作符的(依次接到消息),这里的订阅者会同时接到消息(自己跑一跑,看一下print的过程)
  2. 即使delay了很久,新的订阅者也能与老订阅者同时接到历史消息(离大谱,我5秒后才订阅,为什么能在第3秒时收到消息?)
  3. 而且新订阅也不再是从0开始了

解释一下第2条,被标记为publish的序列,是在connect之后才开始向订阅者广播的,所以如果你connect得很晚,即使后来的订阅者是delay了才开始订阅的,序列会发送哪些值是相对connect的时间的,比如上例:

  1. 2秒后connect(序列才开始发送,1秒后会发0出去)
  2. 4秒后订阅者2加入,时差为2,这个时候序列正好要发送1),所以两个订阅者应该同时收到1
  3. 6秒后订阅者3加入,这个时候序列正要发送3(相比4秒,又过了两秒,1+2嘛),那么三个订阅者会同时收到3

核心机制就是不管scheduler跑了多久,connect的时候才开始发送第一条,如果你delay了一个小时才connect,这期间陆陆续续有不同的订阅者加入,那么这一个小时内加入的所有订阅者都能从头接到消息。后来加入的就是什么时候加入就接到什么消息(不再从0开始了,与不用operater的区别)

Schedulers are an abstraction of mechanisms for performing work, such as on specific threads or dispatch queues. More info

replay

Converts the source Observable sequence into a connectable sequence, and will replay bufferSize number of previous emissions to each new subscriber. More info

func sampleWithReplayBuffer() {
    printExampleHeader(#function)
    
    let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        .replay(5)
    
    _ = intSequence
        .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    delay(2) { _ = intSequence.connect() }
    
    delay(4) {
        _ = intSequence
            .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
    }
    
    delay(15) { // 特意把8改成了15
        _ = intSequence
            .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
    }
}

// sampleWithReplayBuffer() // ⚠️ Uncomment to run this example; comment to stop running

输出:

Subscription 1:, Event: 0
Subscription 2:, Event: 0 // 第4秒加入,但是缓存有5个
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
... // 都是成对的,因为此时两个订阅者
Subscription 1:, Event: 10
Subscription 2:, Event: 10
Subscription 1:, Event: 11
Subscription 2:, Event: 11
// 这里,一次性把12和之前的5个缓存的事件广播给了订阅者3
Subscription 3:, Event: 7
Subscription 3:, Event: 8
Subscription 3:, Event: 9
Subscription 3:, Event: 10
Subscription 3:, Event: 11
// 从这里开始所有订阅者都接收一样的消息了(3个)
Subscription 1:, Event: 12
Subscription 2:, Event: 12
Subscription 3:, Event: 12
Subscription 1:, Event: 13
Subscription 2:, Event: 13
Subscription 3:, Event: 13
...

上面输出已经有简单说明

  1. publish改为了replay

  2. 因为buferSize有5个,所以虽然订阅者2是在第4秒加入的,仍然能得到所有消息(可以自行把这些数值改大一些看看边界值)

  3. 如果错过的消自比较多(比如3号订阅者),那么就会给出最近的几个

    multicast

    Converts the source Observable sequence into a connectable sequence, and broadcasts its emissions via the specified subject.

func sampleWithMulticast() {
    printExampleHeader(#function)
    
    // 生成一个普通发布序列
    let subject = PublishSubject<Int>()
    // 添加一个订阅者
    _ = subject
        .subscribe(onNext: { print("Subject: \($0)") })
    
    // 定时器会往这个序列里注入的意思?
    let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        .multicast(subject)
    
    _ = intSequence
        .subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })
    
    delay(2) { _ = intSequence.connect() }
    
    delay(4) {
        _ = intSequence
            .subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
    }
    
    delay(6) {
        _ = intSequence
            .subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
    }
}

//sampleWithMulticast() // ⚠️ Uncomment to run this example; comment to stop running

输出:

Subject: 0
    Subscription 1:, Event: 0
Subject: 1
    Subscription 1:, Event: 1
    Subscription 2:, Event: 1
Subject: 2
    Subscription 1:, Event: 2
    Subscription 2:, Event: 2
Subject: 3
    Subscription 1:, Event: 3
    Subscription 2:, Event: 3
    Subscription 3:, Event: 3
Subject: 4
    Subscription 1:, Event: 4
    Subscription 2:, Event: 4
    Subscription 3:, Event: 4
  1. opeartor改为了multicast,并且传入了一个序列
  2. 原来example里的部分没有变化,行为跟publish一致
  3. 看来multicast的作用就是能给另一个序列喂数据?
  4. 别的用subscribe方法订阅的还能选择时机(比如delay),multicast则不行,等同于没有delay地直接订阅了。

这一章结束了,到目前为止,有想过这些花活有什么使用场景没?

后面的例子,除了各种监听和connect的时机的差别,与不用任何operator的区别仅仅在于多个监听者是同时接到最新消息的,还是依次接到的,这是我观察到的区别。

这一章的例子都没有dispose,看看是不是schedule的原因

Error Handling Operators

Operators that help to recover from error notifications from an Observable.

catchErrorJustReturn

Recovers from an Error event by returning an Observable sequence that emits a single element and then terminates. More info

// 首先做一个TestError,后面都会用到
enum TestError : Error {
    case test
}

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
// V5版叫catchErrorJustReturn, 下同
    .catchAndReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

catchError

Recovers from an Error event by switching to the provided recovery Observable sequence. More info


let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catch {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

recoverySequence.onNext("😊11")
recoverySequence.onNext("😊22")
recoverySequence.onNext("😊33")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊44")
recoverySequence.onNext("😊55")
recoverySequence.onNext("😊66")

recoverySequence.onNext("😊")

应该能猜到输出了,就是在出错的时候换一个源序列(监听者无感知):

next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊44)
next(😊55)
next(😊66)

retry

Recovers repeatedly Error events by resubscribing to the Observable sequence, indefinitely. More info

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    
    // 创造一个偶发错误,重试的时候会消失
    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    
    return Disposables.create()
}

sequenceThatErrors
    .retry()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

就是出错的时候重试一次:

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭

retry(_:)

有重试自然有重试次数:

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    
    // 创建一个会出错5次的场景
    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    
    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

这次试了三次还有错,就把异常转出来了,之所以是unhandled error,是因为我们只监听了next

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Unhandled error happened: test
 subscription called from:

Debugging Operators

Operators to help debug Rx code.

debug

Prints out all subscriptions, events, and disposals.

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    
    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    
    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .debug()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

其中debug函数可添加一个标识符,添加不添加的差别如下:

// 添加
2023-06-16 11:25:46.751: 调试标记 -> subscribed
2023-06-16 11:25:46.758: 调试标记 -> Event next(🍎)
// 不添加
2023-06-16 11:26:25.435: MyPlayground.playground:32 (__lldb_expr_81) -> Event next(🍎)
2023-06-16 11:26:25.449: MyPlayground.playground:32 (__lldb_expr_81) -> Event next(🍐)

RxSwift.Resources.total

Provides a count of all Rx resource allocations, which is useful for detecting leaks during development.

#if NOT_IN_PLAYGROUND
#else
example("RxSwift.Resources.total") {
    print(RxSwift.Resources.total)
    
    let disposeBag = DisposeBag()
    
    print(RxSwift.Resources.total)
    
    let subject = BehaviorSubject(value: "🍎")
    
    let subscription1 = subject.subscribe(onNext: { print($0) })
    
    print(RxSwift.Resources.total)
    
    let subscription2 = subject.subscribe(onNext: { print($0) })
    
    print(RxSwift.Resources.total)
    
    subscription1.dispose()
    
    print(RxSwift.Resources.total)
    
    subscription2.dispose()
    
    print(RxSwift.Resources.total)
}
    
print(RxSwift.Resources.total)
#endif

RxSwift.Resources.total is not enabled by default, and should generally not be enabled in Release builds.

Follow these instructions to enable RxSwift.Resources.total in your project:

CocoaPods

  1. Add a post_install hook to your Podfile, e.g.:

    target 'AppTarget' do
    pod 'RxSwift'
    end
    
    post_install do |installer|
        installer.pods_project.targets.each do |target|
            if target.name == 'RxSwift'
                target.build_configurations.each do |config|
                    if config.name == 'Debug'
                        config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
                    end
                end
            end
        end
    end
    
  2. Run pod update.

  3. Build project (ProductBuild).

    Carthage

  4. Run carthage build --configuration Debug.

  5. Build project (ProductBuild).


Backlinks