IAsyncReactiveQbservable GetObservable(Uri observableId); IAsyncReactiveQbserver GetObserver(Uri observerId); IAsyncReactiveQubscription GetSubscription(Uri subscriptionId); var numbers = ctx.GetObservable(new Uri("eg://numbers")); // numbers.Expression // ~ Expression.Parameter("eg://numbers") var positive = numbers.Where(x => x >= 0); // positive.Expression // ~ Expression.Call( // Where, // Expression.Parameter("eg://numbers"), // Expression.Lambda(...) // ) var negative = positive.Select(x => -x); // negative.Expression // ~ Expression.Call( // Select, // Expression.Call( // Where, // Expression.Parameter("eg://numbers"), // Expression.Lambda(...) // ), // Expression.Lambda(...) // ) await negative.SubscribeAsync(new Uri("eg://sub/neg"), ...);