Observers
オブザーバーはイベントのシンクとして機能します。RxとIRPの関係は以下の通りです。
Observers act as sinks for events. The relationship between Rx and IRP is as follows:
code:C#
// Rx
interface IObserver<T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
// IRP
interface IAsyncReactiveQbserver<in T> : IExpressible
{
Task OnNextAsync(T value, CancellationToken token = default);
Task OnErrorAsync(Exception error, CancellationToken token = default);
Task OnCompletedAsync(CancellationToken token = default);
}
繰り返しになりますが、これらのインターフェースは、タスクの戻り値の型を使用しても同型です。
Again, these interfaces are isomorphic modulo the use of Task return types.
このインターフェイスの設計は、将来的に強化される可能性のある ValueTask の導入に先立って行われました。実際、IAsyncEnumerable<T>との二重性、特にキャンセルの概念については再考する価値があるでしょう。これらのインターフェイスでのキャンセルのサポートは、境界を越えてイベントを公開する際のI/O操作のキャンセルをサポートするためです。
The design of this interfae predates the introduction of ValueTask which could provide a future enhancement. In fact, it would be worth reconsidering the duality with IAsyncEnumerable<T>, especially around notions of cancellation. The support for cancellation on these interfaces is to support cancellation of I/O operations when publishing events across boundaries.
On*メソッドには、イベントに識別子を関連付けるためのUriパラメータがないことに注意してください。イベントペイロードからイベント識別子を抽出する方法(および抽出するかどうか)については、いくつかの規約を定めて IRP システムに決定させる方が簡単であると考えられました(このドキュメントの前の方で説明したデータモデルを参照)。振り返ってみると、これは OnError と OnCompleted の区切りとなるイベントには役立ちません。したがって、将来の IRP の反復は、(外部識別子を持つ)再帰的な通知オブジェクトが代わりに使用される、より原始的な層の上に、オブザーバーのような化粧板を提供するかもしれません。
Note that the On* methods lack a Uri parameter to associate an identifier with an event. It was deemed easier to let IRP systems decide on how (and if) they extract event identifiers from event payloads by establishing some conventions (cf. the Data Model described earlier in this document). In retrospect, this doesn’t help for the punctuation events of OnError and OnCompleted, so a future iteration of IRP may provide an observer-like veneer on top of a more primitive layer where reified notification objects (with extrinsic identifiers) are used instead:
code:C#
// Rx
class Notification<T>
{
public NotificationKind Kind { get; }
public bool hasValue { get; }
public T Value { get; }
public Exception Error { get; }
void Accept(IObserver<T> observer);
}
// IRP (future)
class AsyncReactiveNotification<T>
{
public Uri NotificationId { get; }
public NotificationKind Kind { get; }
public bool HasValue { get; }
public T Value { get; }
public Exception Error { get; }
Task AcceptAsync(IAsyncObserver<T> observer, CancellationToken token = default);
}
オブザーバーは、最初に思ったよりも「ホット」であることに注意してください。ストリームがホットであると考えられていることから、オブザーバがストリームでバックアップされていればホットであることは明らかです。しかし、オブザーバーの単一インスタンスであっても、終了(エラー、完了)や、提示されたイベントの順次再生可能な処理についての状態を維持していることから、ホットであると言えます。そのため、将来のイテレーションやIRPでは、「オブザーバーファクトリー」という概念が導入される可能性が高いです。これにより、サブスクリプションのコンテキスト外でのオブザーバーのインスタンス化が可能になります(例えば、信頼性の高いキュー、データベースなどの様々な物理的な実装に裏打ちされたシンク上の抽象化レイヤーとしてIPPシステムを使用し、これらの上で統一されたオブザーバーインターフェースを使用する)。サブスクリプションのコンテキストでインスタンス化された場合、ホットオブザーバーインスタンスの寿命はそのサブスクリプションに関連付けられます。
Note that observers are “hotter” than one may think at first. It’s clear that an observer is hot if it’s backed by a stream, given that streams are deemed to be hot. However, even single instances of observers are hot given that they maintain state about termination (error, completion) and about the sequential relaible processing of the events presented to them. As such, a future iteration or IRP will most likely introduce the notion of “observer factories”. This will enable instantiation of observers outside the context of a subscription (e.g. to use an IRP system as an abstraction layer over sinks backed by various physical implementations such as reliable queues, databases, etc. with a unified observer interface over these). When instantiated in the context of a subscription, the lifetime of a hot observer instance is tied to that subscription.