RxSwift
概要
ReactiveX 自体は、非同期プログラミングをサポートする API 定義である。非同期プログラミングでは、複数のプログラムが非同期にイベントを飛ばしあい、関連しあっている。この非同期なイベント通知の流れを制御するためのインタフェースが提供されている。 ReactiveX のアイデアのベースには、Observerパターン がある。Observerパターン では、通知元と通知先として Subject, Observer の二者が存在する。Observer は、監視したい対象の Subject を subscribe する。Subject は subscribe している Observer への参照を保持して、イベント発生時に通知する。 Observerパターン が非同期プログラミングと相性が良いのは、Subject は非同期にイベントを通知するにもかかわらず Observer をブロックすることがなく、にもかかわらず Observer はいつでもイベントを受け取り処理することができるためである。 code:text
+------------+ notify +-----------+
| Subject | -------> | Obserbver |
|------------| |-----------|
| onChange() | | notify() |
+------------+ +-----------+
code:swift
struct Subject {
func onChange() {
self.observers.forEach { $0.notify() }
}
}
struct Observer {
func notify() {
// 何か処理
}
}
RxSwift ではこれらの概念がより拡張して実装されている。主な違いとしては以下がある。 Subject ではなく Observable という命名が利用されている
Observer の unsubscribe は Disposable というオブジェクトに切り出されている
通知されるイベントにコンテキスト (正常終了したか?異常終了したか?等) が付与され通知される
通知のチェーンや、複数の Observable からの複数の通知の合成等のために、様々な API が提供されている
ReactiveX では、Subject にあたる概念は Observable と称される。Observer は Observable を subscribe し、Observable は Observer に対しイベントを emit する。 code:text
+----------+ +------------+
| Observer | -- subscribe --> | Observable |
| | <--- emits ----- | |
+----------+ +------------+
Hot の場合、要素は作成され次第すぐに送出される。そのため、要素が作成された時点でその Observable を subscribe していた Observer にみがその要素を受け取ることができ、それより後に subscribe した Observer はそれらの要素を受け取ることはできず、subscribe した時点以降に作成された要素のみしか受け取れない。 RxSwift のドキュメントには、各々の特徴が下記のようにまとめられている。 table:hot/cold
Hot Observer Cold Observer
変数やプロパティ、マウス座やUI制御値,現在時間 HTTP,TCP通信等の非同期処理
大抵n個の要素を持つ 大抵1つの要素を持つ
計算資源がsubscribeしているObserver間で共有される 計算資源はsubscribeしているObserver間で独立
ステートフル ステートレス
Hot/Cold 変換は下記をよむとわかる。
ConnectableObseravable の実装は下記らへん。
下記のサイトもすごく良さそう!あとで読む
ライブラリの構成は以下のようになっている。
code:text
┌──────────────┐ ┌──────────────┐
│ RxCocoa ├────▶ RxRelay │
└───────┬──────┘ └──────┬───────┘
│ │
┌───────▼──────────────────▼───────┐
│ RxSwift │
└───────▲──────────────────▲───────┘
│ │
┌───────┴──────┐ ┌──────┴───────┐
│ RxTest │ │ RxBlocking │
└──────────────┘ └──────────────┘
RxSwift: ReactiveX で定義された Rx 標準を提供し、他の依存を持たない RxCocoa: iOS/macOS/watchOS/tvOS のための Cocoa 独自の機能を提供する
RxRelay: Subject のシンプルなラッパーである PublishRelay と BehaviorRelay を提供する
RxTest, RxBlocking: Rxベースなシステムのテスト機能を提供する
概念
ObserveableType:
subscribe(_:) によって Observer を登録できる
この時、Disposable を生成する
Disposable:
監視される側のオブジェクトから、unsubscribe の責務だけ切り離されたオブジェクト
dispose() を呼び出すことで、unsubscribe と同等の責務を果たす
DisposeBag:
複数の Disposable を保持する
dispose() によって、保持している Disposable を全て一度に dispose() できる
ObserverType:
on(_:) でイベントを受け取る
イベントは enum Event で包まれており、.next, .error, .completed という3つの状態をもつ
SubjectType:
Observable, Observer の両者の性質を持つ
下記は、RxSwift パッケージ内のクラスを一部抜き出したもの (Swift に抽象クラスはないが、ライブラリ内には無理やり抽象クラスっぽく利用されているクラス定義がいくつかあり、それを表現している。また、UML には extension をうまく表現する記法がないので、点線で囲んで表現している)。ObservableType, ObserverType, SubjectType, Disposable が各々 protocol として定義されていて、そのデフォルト実装が extension で行われている。 https://gyazo.com/1bea335998932f89a3a2899bc337c41c
通知のフロー
登場人物とその責務を俯瞰したところで、実際のコードを書いてみる。何はともあれ、通知元である Observable が存在しなくては話が始まらないため、まずは Observable を作成してみたい。
Observable を作成するシンプルな方法は、Create オペレータを利用することである。これは、RxSwift の実装的には、Observable の振る舞いを subscribe として受け渡すと、内部でそれを利用して AnonymousObservable のインスタンスを生成し返す。引数として Observer を受け取るため、必要に応じて Observer に通知を送信する。 code:swift
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
Observer に通知する他に、Disposables を返す必要があるため、こちらも生成する必要がある。Disposables にも同様に create オペレータが存在する。引数なしで create() を呼び出すと、内部的には NopDisposable が生成され返される。NopDisposable は、その名前の通り、dispose() されても何もしない Disposable である。
code:swift
extension Disposables {
static public func create() -> Disposable {
return NopDisposable.noOp
}
}
Disposables は既述の通り、unsubscribe 時に実行されるべき処理を実行する責務を持つ。unsubscrbe するというと、Observable から Observer の参照を除去するイメージだけど、それに該当するようなロジックは RxSwift が行ってくれているようなので、開発者が Disposables に渡すべきロジックは、処理がキャンセルされた場合に実行されるべき処理 である。例えば、API 通信等、実行が非同期であり、キャンセルされた時は中断すべきようなロジックが存在した場合には、Disposables にキャンセル時のロジックを受け渡す必要がある。 そのような特別な処理が入らない場合、単純な Observalbe の作成は以下のようになる。
code:swift
let observable: Observable<Int> = Observable.create { observer in
observer.on(.next(10))
return Disposables.create()
}
次は、Observable を監視し、通知を受け取った際に何らかの処理を実施する責務を担う Observer をどう定義するかになる。これは、単純に既存の Observable を subscribe するだけで良い。また、流れてきた Disposable を保持しておく DisposableBag も用意しておく必要がある。
code:swift
let disposeBag = DisposeBag()
let observable: Observable<Int> = Observable.create { observer in
observer.on(.next(10))
return Disposables.create()
}
observable
.subscribe({ event in
switch event {
case .completed:
print("completed")
case .error(_):
print("error")
case let .next(result):
print("next: \(result)") // next: 10
}
})
.disposed(by: disposeBag)
イベント通知を止める条件
Observable から Observer にイベント通知が飛ぶが、この時、.next 以外のイベントが飛んできたが場合、それ以降イベントは届かなくなる。この機構は ObserverBase<Element> に実装されている。
code:swift
class ObserverBase<Element> : Disposable, ObserverType {
private let _isStopped = AtomicInt(0)
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
.next 以外が流れてきた場合、上記の on(_:) メソッドにて、イベント通知がブロックされる。具象クラスである AnonymousObserver<Element> 側では、onCore でイベントを受け取る。
code:swift
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
Subject
Subject には、代表的なものに BehaviorSubject, PublicSubject がある。これらの振る舞いの違いを確認してみる。
PublishSubject は、Observer と Observable の両方の性質を持つ、という基本的な Subject の実装。流れてきたイベントを、そのまま保持している Observer に流す。
code:swift
public final class PublishSubject<Element>: Observable<Element>, SubjectType, ObserverType {
// 保持している状態
private var _isDisposed = false
private var _observers = Observers()
private var _stopped = false
private var _stoppedEvent = nil as Event<Element>?
public func on(_ event: Event<Element>) {
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
}
BehaviorSubject は、PublishSubject に加えて、最後に受け取った値を保持する。そのため、時間とともに変化する値を抽象化したオブジェクト、とも呼ばれる。Observer は BehaviorSubject を subscribe することで、最新の値を通知で受け取ることができる (Push)。また、value によって能動的に最新の値を参照することもできる (Pull)。
code:swift
// **簡略化のため、だいぶ割愛**
public final class BehaviorSubject<Element>: Observable<Element>, SubjectType, ObserverType {
// 保持している状態
private var _isDisposed = false
private var _element: Element // <- これが PublishSubject にはなかった
private var _observers = Observers()
private var _stoppedEvent: Event<Element>?
public init(value: Element) {
self._element = value
}
// 現在の値を返す
public func value() throws -> Element {
return self._element
}
public func on(_ event: Event<Element>) {
switch event {
case .next(let element):
self._element = element
case .error, .completed:
self._stoppedEvent = event
}
dispatch(self._observers, event)
}
}
概要
RxRelay は Subject のシンプルなラッパーであり、下記の二点が変更されている。 Relay は complete しない
Relay はエラーを送出しない
すなわち、Relay は .next イベントのみを送出し、そして決して終了することはなく、通知は送り続けられる。
Subject 自体も有用だが、完了あるいは異常終了時にイベントが通知されなくなってしまう、という点でステートフルな存在だった。これはケースによっては困るため、Relay はこのようなステートを気にしないシンプルな構成となっている。
Subjects are useful to bridge the gap between non-Rx APIs. However, they are stateful in a damaging way: when they receive an onComplete or onError they no longer become usable for moving data. This is the observable contract and sometimes it is the desired behavior. Most times it is not.
Relays are simply Subjects without the aforementioned property. They allow you to bridge non-Rx APIs into Rx easily, and without the worry of accidentally triggering a terminal state.
主な利用ケースとしては、View/Model 間のデータバインディングが考えられる。データバインディングでは、その同期は終了せず常に試みられるのが望ましい。RxCocoa でも、データバインディングには RxRelay が利用されている。 bind
PublishRelay は PublishSubject の、BehaviorRelay は BehaviorSubject の簡単なラッパーである。コードを見ても、内部的に Subject を保持しているのみで、独自の振る舞いはほとんど見られない。
では、Relay独自の「completeしない」「エラーを送出しない」の2つの振る舞いはどこで実現されているのか?というと、Relay における subscribe に代わるメソッドである bind として実現されている。bind によって Observable に Relay を登録すると、Relay へは .complete や .error が無視され、.next のみが通知されるようになる。
code:swift
extension ObservableType {
public func bind(to relays: PublishRelay<Element>...) -> Disposable {
return bind(to: relays)
}
return subscribe { e in
switch e {
case let .next(element):
relays.forEach {
$0.accept(element)
}
case let .error(error):
rxFatalErrorInDebug("Binding error to publish relay: \(error)")
case .completed:
break
}
}
}
// BehaviorRelayも同様の定義がされている
}
概要
https://gyazo.com/bc1c1ec4c5b69358dcb708c458c765a6
Foundation の Rx 拡張
UIKit の各要素は、rx を プロキシして、RxSwift の機能を利用できる。基本的な利用方法は以下のようになる。 code:swift
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
hogeButton.rx.tap
// 何か処理
}
.disposed(by: disposeBag)
}
code:swift
public struct Reactive<Base> {
public let base: Base
public init(_ base: Base) {
self.base = base
}
}
そして、このプロキシを生やす ReactiveCompatible というプロトコルが用意されている。このプロトコルに適合したオブジェクトには Reactive プロキシが生える。そして、このプロトコルは NSObject に適合させられているため、UI 要素をはじめとした NSObject ベースの全てのオブジェクトに、Reactive プロキシが生えるようになっている。 code:swift
public protocol ReactiveCompatible {
associatedtype ReactiveBase
static var rx: Reactive<ReactiveBase>.Type { get set }
var rx: Reactive<ReactiveBase> { get set }
}
extension ReactiveCompatible {
public static var rx: Reactive<Self>.Type {
get { return Reactive<Self>.self }
set { }
}
public var rx: Reactive<Self> {
get { return Reactive(self) }
set { }
}
}
import class Foundation.NSObject
/// これで、NSObject に rx proxy が生える
extension NSObject: ReactiveCompatible { }
code:swift
// UIButton の Reactive プロキシの拡張
extension Reactive where Base: UIButton {
public var tap: ControlEvent<Void> {
return controlEvent(.touchUpInside)
}
}
code:swift
// 拡張されたプロパティを利用する場合
hogeButton.rx.tap
// 何か処理
}
.disposed(by: disposeBag)
rx 経由でどのような型のプロパティが露出しているのかは、UI要素毎に異なる。
Binder
rx プロキシ経由で、Binder という型のプロパティが生えている UI 要素がいくつかある。これは ObserverType の実装である。Observer なので、他の状態に依存してその値を変化させることができる。また、Binder は以下のルールが設けられている。
エラーをバインドしない
バインドが特定のスケジューラで行われることが保証される
デフォルトだと、main スケジューラが割り当てられる
UI 要素のための Observer であり、メインスレッドで実行することが保証されている。.error や .completed を送出しないところは Relay と似ているが、これは Observer の実装であり Subject ではない。
実装もかなり簡単で、複雑なことはしていない。
Traits (WIP)
概要
大抵のことは既存の Observable を利用すれば実現できるが、特定のユースケースで繰り返し使いたいような Observable の利用方法のパターンがいくつか存在する。これらを実現するために、Observable をラップ&拡張したのが Trait である。
Traits are simply a wrapper struct with a single read-only Observable sequence property.
RxSwift Traits
RxSwift の Traits は、いずれも 一度のみ イベントを送出する。これらはいずれも PremitiveSequence という型のエイリアスとなっている。 RxSwift において、Observable 自体は抽象クラスとして表現されている。Observable は ObservableType に適合していて、Observable への機能の拡張は、この ObservableType を extension して行われている。 code:Observable.swift
public class Observable<Element> : ObservableType {
public func subscribe<Observer: ObserverType>(_ observer: Observer)
-> Disposable where Observer.Element == Element
public func asObservable() -> Observable<Element> {
return self
}
}
code:ObservableType.swift
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer)
-> Disposable where Observer.Element == Element
}
extension ObservableType {
public func asObservable() -> Observable<Element>
}
上記のような Observable 及び ObservableType に対して、その拡張の方法は以下のように行われている。
code:Deffered.swift
extension ObservableType {
public static func deferred(_ observableFactory: ...)
}
code:Create.swift
extension ObservableType {
public static func create(_ subscribe: ...)
}
PrimitiveSequence に対する Trait の拡張方法も、似たような形式になっている。Observable をラップし保持しているのは PremitiveSequence であるが、Trait に応じて異なる振る舞いを拡張するのには PremitiveSequenceType という protocol が利用される。 code:swift
// Observableのラッパー
public struct PrimitiveSequence<Trait, Element> {
let source: Observable<Element>
}
public protocol PrimitiveSequenceType {
associatedtype Trait
associatedtype Element
// 自身をPremitiveSequenceに変換する
var primitiveSequence: PrimitiveSequence<Trait, Element> { get }
}
extension PrimitiveSequence: ObservableConvertibleType {
// 自身をObservableに変換する
public func asObservable() -> Observable<Element> {
return self.source
}
}
// Observableとしての振る舞い定義. Trait によって変わらない共通の振る舞い
extension PrimitiveSequence {
public static func deferred(_ ...)
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> PrimitiveSequence<Trait, Element>
// ...
}
code:Single.swift
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
extension PrimitiveSequenceType where Trait == SingleTrait {
public typealias SingleObserver = (SingleEvent<Element>) -> Void
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element>
public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable
}
RxSwift 内で定義されている Traits には、以下のようなものがある。 Single:
1度のみ、要素かエラーのどちらかを送出する
利用例) 1度のリクエストに対し、成功か失敗かのみを返すHTTP リクエスト
その他、要素の無限のストリームを考えず単一の要素のみを考えれば良いようなケース
Completable:
1度のみ、完了, もしくはエラーのみを送出する
利用例) 値は必要なく、操作が完了したかどうかだけが知りたい場合
Maybe:
1度のみ、要素もしくは完了 or エラーのいずれかを送信する
副作用がない
利用例) 要素を出力できるが、必ずしも出力する必要はない場合
RxCocoa Traits (WIP)
RxCocoa の Traits は、計算資源を共有する のが特徴になっている。計算資源を共有する、と言われても正直なところピンとこないので、実装を確認してみる。 RxSwift の Traits と同様に、こちらの Traits も SharedSequence というとある型のエイリアスである。この SharedSequence も Observable を保持しているが、単なる Observable のラッパーではない。型引数として指定された SharingStrategyProtocol に適合した SharingStrategy の share(_:) 関数の適用後の Observable を保持することになる。SharingStrategyProtocol とは、その名の通り Observable の計算資源の共有方法を実装した型のための protocol である。 code:SharedSequence.swift
// SharedStrategyを型引数にとる
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element>
: SharedSequenceConvertibleType {
// SharedSequnce自体は、Observableの単純なラッパー
let _source: Observable<Element>
// 受け取ったObservableは、SharedStrategyにより計算資源を共有することになる
init(_ source: Observable<Element>) {
self._source = SharingStrategy.share(source)
}
}
public protocol SharingStrategyProtocol {
// Sequence間で共通で利用するスケジューラー
static var scheduler: SchedulerType { get }
// 計算資源の共有方法の実装
static func share<Element>(_ source: Observable<Element>) -> Observable<Element>
}
Driver の実装を覗いてみると、以下がわかる。
Driver は DriverSharingStrategy を利用して計算資源を共有する
DriverSharingStragety は、デフォルトの Main スレッドのスケジューラを利用する
DriverSharingStrategy は、share(replay:scope:) 関数を Observable に適用する
code:Driver.swift
// Driverは、DriverSharingStrategyを利用して計算資源を共有するSharedSequence
public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>
public struct DriverSharingStrategy: SharingStrategyProtocol {
// 利用するスケジューラは、デフォルトだとMainスレッドのスケジューラ
public static var scheduler: SchedulerType { SharingScheduler.make() }
//
public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
return source.share(replay: 1, scope: .whileConnected)
}
}
extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy {
/// Adds asDriver to SharingSequence with DriverSharingStrategy.
public func asDriver() -> Driver<Element> {
return self.asSharedSequence()
}
}
code:Signal.swift
public typealias Signal<Element> = SharedSequence<SignalSharingStrategy, Element>
public struct SignalSharingStrategy: SharingStrategyProtocol {
// Mainスケジューラ
public static var scheduler: SchedulerType { SharingScheduler.make() }
public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
return source.share(scope: .whileConnected)
}
}
extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy {
/// Adds asPublisher to SharingSequence with PublishSharingStrategy.
public func asSignal() -> Signal<Element> {
return self.asSharedSequence()
}
}
Driver:
目的: UI層でReactiveなコードを書くための直感的な方法を提供する
エラーを送出しない
監視が Main スレッドで行われる
副作用がある
具体的にいうと、状態を持つ。BehaviorRelay と同様に、流れてきた値のうち最新の1つを保持し、subscribeされたタイミングでそれを流す
Signal:
エラーを送出しない
監視が Main スレッドで行われる
計算資源を共有する
こちらは replay しない
素晴らしい。
WIP