R3 — A New Modern Reimplementation of Reactive Extensions for C#

Yoshifumi Kawai
19 min readMar 5, 2024

--

Recently, I officially released R3 as a new implementation of Reactive Extensions for C#! R3 is named as the third generation of Rx, considering Rx for .NET as the first generation and UniRx as the second. The core part of Rx (almost identical to dotnet/reactive) is provided as a library common to .NET, while custom schedulers and operators specific to each platform are separated into different libraries. This approach allows us to offer a core library for all .NET platforms and extension libraries for various frameworks such as Unity, Godot, Avalonia, WPF, WinForms, WinUI3, Stride, LogicLooper, MAUI, MonoGame and Blazor.

While it includes some breaking changes and is not a drop-in replacement, transitioning from dotnet/reactive or UniRx is kept realistically manageable. This is part of the beauty of Rx, where vocabulary and operations are largely standardized in a LINQ-like manner, meaning the transition may not seem significantly different.

The UniRx I had previously developed was an Rx exclusively for Unity. Therefore, R3 provides extensive support for Unity, offering sufficient functionality to serve as a migration destination from UniRx. Additionally, by becoming a general-purpose library, it now supports many scenarios for use with .NET. It is not an Rx specialized for use with game engines, but rather, it has been created as a new implementation of Reactive Extensions.

The History of Rx and vs. async/await

Are you using Rx? The number of people answering “no” is increasing, not just in .NET or Unity, but in Java, Swift, and Kotlin as well. Its presence is clearly declining. Why? The answer is simple: the advent of async/await. Reactive Extensions for .NET first appeared in 2009, during the era of C# 3.0 and .NET Framework 3.5, a time when platforms like Silverlight and Windows Phone, now defunct, were still relevant. async/await (introduced in C# 5.0, 2012) didn’t exist yet, and even Tasks had not been introduced. As a side note, the “Extensions” in Reactive Extensions were named after the earlier Parallel Extensions project, which included Parallel LINQ and the Task Parallel Library added in .NET Framework 4.0.

Initially, Rx spread across various languages as the definitive solution for asynchronous processing without language support, offering a powerful and user-friendly alternative to single-function Task or Promise. I, too, was mesmerized by Rx over TPL at the time. However, the landscape changed dramatically with the introduction of async/await, establishing it as the standard for asynchronous processing across numerous languages.

With the widespread adoption of async/await, the need for Rx just for asynchronous processing diminished, leading to a decline in its adoption rate. As the developer of UniRx, a standard for Rx in Unity, I quickly recognized the need for an async/await runtime tailored to game engines (Unity) and developed UniTask as soon as the necessary conditions (C# 7.0) were met in Unity.

Rediscovering the Value of Rx

Rx is not just for asynchronous processing, right? While it was hailed as “LINQ to Everything,” the notion of “Everything” might be noise, and it’s better to separate concerns and use the most optimal tools. Using Rx just for async processing is not the best approach; a single-value Observable should be represented by a Task for both clarity and performance benefits. This necessitates the integration of Rx with async/await through APIs that can coexist with asynchronous tasks, rather than focusing on minor details like being able to pass a Task to SelectMany because Observables are monads.

Simply being able to await is not sufficient for real-world application development. Various libraries have been devised for asynchronous/parallel processing, not just Rx, such as TPL Dataflow. However, few people would choose to use these libraries from scratch today. It’s now 2024, and the winners have been decided: language-supported IAsyncEnumerable and System.Threading.Channels are the best choices. These also incorporate backpressure characteristics, making operators related to backpressure in RxJava unnecessary for .NET. For more specific I/O operations, System.IO.Pipelines offers maximum performance.

Asynchronous LINQ might be a nice addition, but given its lower usage frequency compared to LINQ to Objects in actual asynchronous stream scenarios, it’s not something to be eagerly adopted (note that I have implemented UniTaskAsyncEnumerable and LINQ for UniTask myself). The dream of distributed queries (IQbservable) in Rx might have found its modern counterpart in GraphQL. In terms of distributed systems, Kubernetes has gained widespread adoption, with gRPC becoming the standard for RPC, and other options like Orleans, Akka.NET, SignalR, and MagicOnion offering a variety of choices.

The landscape is no longer the same as in 2009, where various technologies competed for dominance. Just as no one would choose Service Fabric today, venturing into distributed processing is not the future of Rx, in my opinion. Just because Rx was created by the Cloud Programmability Team doesn’t mean that making it useful for the cloud is the only correct approach. Of course, there could be multiple futures, and I hope one possible future for Rx is R3.

So, where does the value of Rx lie? I believe it returns to its roots: processing in-memory messaging with LINQ, or LINQ to Events. Especially on the client side and in UI processing, Rx continues to be valued, with Rx-like but more language-optimized options like Kotlin Flow and Swift Combine still actively used. Even in complex, event-heavy game applications, as a developer of UniRx used in the game engine (Unity), I find it extremely beneficial. The significance of the observer pattern and events is undeniable, and Rx’s role as a “better event” or the ultimate observer pattern remains unchanged.

Reconstruction with R3

Initially, I debated whether to maintain 100% compatibility with Rx interfaces while removing legacy APIs and adding new ones, or to fundamentally change them. However, to solve all the issues I perceive, radical changes were necessary. Inspired by the success of Kotlin Flow and Swift Combine, I decided to reconstruct Rx completely anew, tailored to the modern C# environment of .NET 8 and C# 12.

Even so, the differences in interfaces are not that significant in the end.

public abstract class Observable<T>
{
public IDisposable Subscribe(Observer<T> observer);
}

public abstract class Observer<T> : IDisposable
{
public void OnNext(T value);
public void OnErrorResume(Exception error);
public void OnCompleted(Result result); // Result is (Success | Failure)
}

At a glance, the main changes are the transformation of OnError into OnErrorResume and the shift from interface to abstract class. One of the changes I felt was necessary was OnError, where the behavior of unsubscribing due to exceptions in the pipeline was considered a billion-dollar mistake in Rx. In R3, exceptions flow to OnErrorResume without unsubscribing. Instead, the pipeline’s termination is indicated by passing a Result representing Success or Failure to OnCompleted.

The definitions of IObservable<T>/IObserver<T> are closely related to those of IEnumerable<T>/IEnumerator<T>, and they are claimed to be a mathematical duality, but there are practical inconveniences, with the most significant being that it stops on OnError. The inconvenience stems from the different lifetimes of exceptions in IEnumerable<T>'s foreach and IObservable<T>. While an exception in foreach ends the iteration there, and if necessary, is handled with try-catch, often without retrying, subscribing to an Observable is different. The lifespan of an event subscription is long, and it's not unnatural to want it not to stop even if an exception occurs. Normal events do not stop when an exception occurs, but in Rx, due to the operator chain, there's always a possibility of exceptions occurring in the pipeline (e.g., Select or Where might throw exceptions). When considered as an alternative or superior to events, it becomes unnatural for it to stop due to exceptions.

And it’s not just about catching and retrying if needed! Re-subscribing to a stopped event in Rx is very difficult! Unlike events, Observables have a concept of completion. Subscribing to a completed IObservable immediately calls OnError | OnCompleted, thus automatic re-subscription risks re-subscribing to a completed sequence. Of course, this would lead to an infinite loop, without a way to detect and handle it properly. There are many questions on Stack Overflow about how to re-subscribe to UI subscriptions in Rx/Combine/Flow, and the answers often require writing very complex code. Reality is not solved with Repeat/Retry alone!

Therefore, it was changed to not stop on exceptions. To avoid confusion with the traditional stopping behavior, it was renamed to OnErrorResume. This solves all issues related to re-subscription. Moreover, this change has advantages; changing from stopping to not stopping is impossible (as the Dispose chain would run, making it impossible to restore state, leaving no means other than total re-subscription), but changing from not stopping to stopping is very easy to implement and performs well. Just prepare an operator that converts OnErrorResume to OnCompleted(Result.Failure) (a standard operator OnErrorResumeAsFailure has been added).

Rx itself, while having complex contracts (such as either OnError or OnCompleted is issued, but not both), lacks implementation guarantees in its interface, making correct implementation of custom operator is difficult. For instance, correctly handling the Disposable returned when Subscribe is delayed (using SingleAssignmentDisposable) is difficult to understand properly. Where do exceptions occurring in onNext during Subscribe go, to onError to be Disposed of, or do they continue? This behavior is not specifically regulated, so implementations can vary. R3 ensures most contracts by becoming an abstract class, unifying behavior and easing custom implementations.

The primary reason for making it an abstract class was to centralize management of all subscriptions. All Subscribes must go through the base class’s Subscribe implementation, enabling tracking of subscriptions. For example, it can be displayed as follows:

This is an extension window for Unity, but it exists for Godot and is offered as an API, allowing it to be logged or retrieved at any time, and making custom visualization possible.

While Task has Parallel Debugger (which also centralizes management in the base class when s_asyncDebuggingEnabled), visualizing Rx subscriptions is far more critical. Event subscription leaks are common, and developers often scramble to find them at the end of development, but with R3, this is no longer necessary! Significantly improved development efficiency!

R3 prioritizes subscription management and leak prevention, tracking all subscriptions with Observable Tracker and introducing the concept that “all Observables can complete.”

The basic principle of subscription management in Rx is disposing of IDisposable. However, unsubscribing is not limited to this; it can also be done by flowing OnError | OnCompleted (not guaranteed by the IObservable contract but implemented as such, and R3 ensures it will always be so through the base class). Thus, handling leaks from both upstream (issuance of OnError | OnCompleted) and downstream (Dispose) can more reliably prevent leaks.

While this may seem excessive, experience in developing actual applications suggests that excessive subscription management is just right. From this philosophy, R3 has made Observables like Observable.FromEvent, Observable.Timer, EveryUpdate, which previously had no means to issue OnCompleted, able to do so. The method of issuance is by passing a CancellationToken, leveraging the widely (or excessively) used CancellationToken in the modern API design post-async/await. Additionally, with the idea that all Observables can complete, disposing of a Subject now standardly issues OnCompleted.

Reconsidering IScheduler

IScheduler is the mechanism that enables the magic of moving through time and space in Rx. By passing it to Timer or ObserveOn, you can move values to any place (Thread, Dispatcher, PlayerLoop, etc.) and time.

public interface IScheduler
{
DateTimeOffset Now { get; }

IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action);
}

And, it turns out to be flawed. If you have ever looked at the Rx source code, you may have noticed that from the beginning, an additional, different definition has been prepared. For example, ThreadPoolScheduler implements interfaces like the following.

public interface ISchedulerLongRunning
{
IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action);
}

public interface ISchedulerPeriodic
{
IDisposable SchedulePeriodic<TState>(TState state, TimeSpan period, Func<TState, TState> action);
}

public interface IStopwatchProvider
{
IStopwatch StartStopwatch();
}

public abstract partial class LocalScheduler : IScheduler, IStopwatchProvider, IServiceProvider
{
}

public sealed class ThreadPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
{
}

And the following calls are made.

public static IStopwatch StartStopwatch(this IScheduler scheduler)
{
var swp = scheduler.AsStopwatchProvider();
if (swp != null)
{
return swp.StartStopwatch();
}

return new EmulatedStopwatch(scheduler);
}

private static IDisposable SchedulePeriodic_<TState>(IScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
{
var periodic = scheduler.AsPeriodic();
if (periodic != null)
{
return periodic.SchedulePeriodic(state, period, action);
}

var swp = scheduler.AsStopwatchProvider();
if (swp != null)
{
var spr = new SchedulePeriodicStopwatch<TState>(scheduler, state, period, action, swp);
return spr.Start();
}
else
{
var spr = new SchedulePeriodicRecursive<TState>(scheduler, state, period, action);
return spr.Start();
}
}

In essence, there are quite a few cases where raw IScheduler is not used. The reason for not using it is due to performance issues, as IScheduler.Schedule is defined only for single executions, and the idea is that multiple calls can be made recursively, but generating a new IDisposable each time poses a performance issue. To avoid this, ISchedulerPeriodic and others were prepared.

In that case, wouldn’t it be better to use something that properly reflects the reality rather than IScheduler? This led to the discovery that TimeProvider, added in .NET 8, can do what IScheduler did more efficiently.

public abstract class TimeProvider
{
// use these.
public virtual ITimer CreateTimer(TimerCallback callback, object? state, TimeSpan dueTime, TimeSpan period);
public virtual long GetTimestamp();
}

The ITimer generated by CreateTimer has sufficient functionality to perform what ISchedulerPeriodic can do, and in scenarios where one-time executions are repeated (Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)), using ITimer is more efficient than dotnet/reactive's ThreadPoolScheduler (which creates a new Timer each time).

Regarding the current time acquisition, TimeProvider also has DateTimeOffset TimeProvider.GetUtcNow() similar to DateTimeOffset IScheduler.Now, but it only uses long GetTimestamp. The reason is that only ticks are necessary for operator implementation, so it's better to avoid the overhead of wrapping it in DateTimeOffset and directly handle raw ticks for time calculations.

DateTimeOffset.UtcNow can be affected by changes to the OS system time, so it's better to use GetTimestamp (which uses a high-resolution timer from Stopwatch.GetTimestamp() as standard) without going through DateTimeOffset for that reason as well.

Another problem with IScheduler is the existence of synchronously operating schedulers like ImmediateScheduler and CurrentScheduler. Assigning time-related processes like Timer or Delay to these results in emulating asynchronous code that should not be used, i.e., sleeping the thread. Therefore, it might be better not to have synchronous Schedulers at all. In R3, they were completely removed, and specifying TimeProvider means always making asynchronous calls.

The problem with ImmediateScheduler and CurrentScheduler is not just that, but also that their performance is critically poor.

Result of Observable.Range(1, 10000).Subscribe()

The poor results of ImmediateScheduler, if not CurrentScheduler, might be counterintuitive. The ImmediateScheduler in dotnet/reactive new AsyncLockScheduler() for call Schedule, and the constructor of the base class LocalScheduler called by AsyncLockScheduler does SystemClock.Register, which locks, new WeakReference<LocalScheduler>(scheduler), and HashSet.Add. It's no wonder the performance is bad (although it's limited to just generating a SingleAssignmentDisposable each time for recursive calls, which is still a lot).

You might think it’s okay because Range is rarely used, but ImmediateScheduler is actually used quite often in unexpected places. A typical example is Merge, which uses ImmediateScheduler when IScheduler is unspecified, so if it's built to repeat frequent subscriptions, it may be called a considerable number of times. In fact, when I use dotnet/reactive in a server application, Merge and ImmediateScheduler once accounted for a significant portion of the server's memory usage. At that time, I managed to get by by creating a custom lightweight scheduler, specifying it directly, and thoroughly avoiding ImmediateScheduler. If there is a next dotnet/reactive, the performance improvement of ImmediateScheduler should be the first thing to do.

The reason for doing SystemClock.Register seems to be for monitoring changes to the system time with DateTimeOffset.UtcNow. In other words, had we used long(timestamp) from the start, we wouldn't have invited such critical performance degradation. This is also one of the reasons for the failure in defining the IScheduler interface.

By adopting TimeProvider, it’s also worth noting that unit testing has become easier with standard methods using Microsoft.Extensions.Time.Testing.FakeTimeProvider.

FrameProvider

One thing that is not present in other Rx libraries, but has been immensely effective in UniRx, is the frame-based operator. These include operators like DelayFrame that executes after a set number of frames, NextFrame for execution in the next frame, EveryUpdate as a factory that emits every frame, and EveryValueChanged for monitoring values every frame, all of which are convenient for use in game engines.

What I realized is that time and frames are conceptually similar, and not just in game engines, but also in UI processes where you have message loops and rendering loops, these concepts exist across various frameworks. Therefore, in R3, we abstracted frame-based processing in the form of FrameProvider, complementing TimerProvider. This allows the frame-based operators, previously only available to Unity, to work across any framework that supports C# (WinForms, WPF, WinUI3, MAUI, Godot, Avalonia, Stride, etc…).

public abstract class FrameProvider
{
public abstract long GetFrameCount();
public abstract void Register(IFrameRunnerWorkItem callback);
}

public interface IFrameRunnerWorkItem
{
// true, continue
bool MoveNext(long frameCount);
}

In R3, for every operator that requires a TimeProvider, we implemented a corresponding ***Frame operator.

  • Return <-> ReturnFrame
  • Yield <-> YieldFrame
  • Interval <-> IntervalFrame
  • Timer <-> TimerFrame
  • Chunk <-> ChunkFrame
  • Debounce <-> DebounceFrame
  • Delay <-> DelayFrame
  • DelaySubscription <-> DelaySubscriptionFrame
  • ObserveOn(TimeProvider) <-> ObserveOn(FrameProvider)
  • Replay <-> ReplayFrame
  • Skip <-> SkipFrame
  • SkipLast <-> SkipLastFrame
  • SubscribeOn(TimeProvider) <-> SubscribeOn(FrameProvider)
  • Take <-> TakeFrame
  • TakeLast <-> TakeLastFrame
  • ThrottleFirst <-> ThrottleFirstFrame
  • ThrottleFirstLast <-> ThrottleFirstLastFrame
  • ThrottleLast <-> ThrottleLastFrame
  • Timeout <-> TimeoutFrame

async/await Integration

First, we thoroughly eliminated Observables that return a single value, which are seen as a bad practice in existing Rx. These should be handled with async/await instead, as operators that return or expect a single value can introduce noise that leads to bad practices. First becomes FirstAsync, returning a Task<T>. AsyncSubject is removed; please use TaskCompletionSource instead.

Moreover, current C# code often involves asynchronous code, but fundamentally, Rx only accepts synchronous code. Carelessly, this could lead to a FireAndForget situation, and simply mixing it with SelectMany is not sufficient. Thus, we introduced special methods for Where/Select/Subscribe.

  • SelectAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask<TResult>> selector, AwaitOperation awaitOperation = Sequential, ...)
  • WhereAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask<Boolean>> predicate, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, Action<Result> onCompleted, AwaitOperation awaitOperation = Sequential, ...)
  • SubscribeAwait(this Observable<T> source, Func<T, CancellationToken, ValueTask> onNextAsync, Action<Exception> onErrorResume, Action<Result> onCompleted, AwaitOperation awaitOperation = Sequential, ...)
public enum AwaitOperation
{
/// <summary>All values are queued, and the next value waits for the completion of the asynchronous method.</summary>
Sequential,
/// <summary>Drop new value when async operation is running.</summary>
Drop,
/// <summary>If the previous asynchronous method is running, it is cancelled and the next asynchronous method is executed.</summary>
Switch,
/// <summary>All values are sent immediately to the asynchronous method.</summary>
Parallel,
/// <summary>All values are sent immediately to the asynchronous method, but the results are queued and passed to the next operator in order.</summary>
SequentialParallel,
/// <summary>Send the first value and the last value while the asynchronous method is running.</summary>
ThrottleFirstLast
}

SelectAwait, WhereAwait, SubscribeAwait accept asynchronous methods and offer six patterns for handling values that arrive while the asynchronous method is executing. Sequential queues the values until the asynchronous method completes. Drop discards all values that arrive while it’s executing,

useful for preventing multiple submissions in event handling. Switch cancels the ongoing asynchronous method and starts the next one, similar to Observable<Observable>.Switch. Parallel executes methods in parallel, like Observable<Observable>.Merge. SequentialParallel runs operations in parallel but ensures the values are passed to the next operator in the order they arrived. ThrottleFirstLast sends the first and last values received while the asynchronous method is running.

Furthermore, the following time-based filtering methods now accept asynchronous methods as well.

  • Debounce(this Observable<T> source, Func<T, CancellationToken, ValueTask> throttleDurationSelector, ...)
  • ThrottleFirst(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)
  • ThrottleLast(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)
  • ThrottleFirstLast(this Observable<T> source, Func<T, CancellationToken, ValueTask> sampler, ...)

We have also made Chunk accept asynchronous methods, and SkipUntil/TakeUntil has a variation that accepts CancellationToken and Task.

  • SkipUntil(this Observable<T> source, CancellationToken cancellationToken)
  • SkipUntil(this Observable<T> source, Task task)
  • SkipUntil(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, ...)
  • TakeUntil(this Observable<T> source, CancellationToken cancellationToken)
  • TakeUntil(this Observable<T> source, Task task)
  • TakeUntil(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncFunc, ...)
  • Chunk(this Observable<T> source, Func<T, CancellationToken, ValueTask> asyncWindow, ...)

For example, using an asynchronous version of Chunk allows you to create chunks at random intervals, not just fixed ones, enabling complex logic to be written more naturally and simply.

Observable.Interval(TimeSpan.FromSeconds(1))
.Index()
.Chunk(async (_, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(Random.Shared.Next(0, 5)), ct);
})
.Subscribe(xs =>
{
Console.WriteLine(string.Join(", ", xs));
});

async/await is indispensable in modern C# code, and we’ve made every effort to integrate it smoothly with Rx.

Retry operations can also benefit from async/await for better handling. Previously, Rx could only retry the entire pipeline, but with R3’s acceptance of async/await, retries can be performed on a per-asynchronous method execution basis.

button.OnClickAsObservable()
.SelectAwait(async (_, ct) =>
{
var retry = 0;
AGAIN:
try
{
return await DownloadTextAsync("https://google.com/", ct);
}
catch
{
if (retry++ < 3) goto AGAIN;
throw;
}
}, AwaitOperation.Drop)
.Subscribe();

Repeat can also be implemented with async/await. In this case, managing repeat conditions can be simpler than relying solely on Rx operators, potentially offering higher readability. Prioritizing readability (and performance) in coding is crucial. Let’s continue to effectively integrate Rx with async/await for better code.

You can also create Observables from asynchronous methods with Create and CreateFrom, which might allow for more straightforward descriptions compared to forcibly twisting operators.

  • Create(Func<Observer<T>, CancellationToken, ValueTask> subscribe, ...)
  • CreateFrom(Func<CancellationToken, IAsyncEnumerable<T>> factory)

Naming Rules

In R3, the names of several methods have been changed from those in dotnet/reactive or UniRx. For example:

  • Buffer -> Chunk
  • StartWith -> Prepend
  • Distinct(selector) -> DistinctBy
  • Throttle -> Debounce
  • Sample -> ThrottleLast

Let’s explain the reasons for these changes.

First, when creating a LINQ-style library in .NET, the highest priority should be given to the method names implemented in LINQ to Objects (Enumerable). The reason why Buffer was changed to Chunk is because Enumerable.Chunk was added in .NET 6, and its function is the same as Buffer. Rx predates the introduction of Chunk, so there's nothing that can be done about the differing names, but if there are no constraints, names should align with LINQ to Objects. Therefore, Chunk is the only choice. The same goes for Prepend and DistinctBy.

You might resist changing Throttle to Debounce. This is because the world's standard is Debounce. In the Rx world, dotnet/reactive is the only one that refers to Debounce as Throttle. It could be argued that there's no need to change since RxNet is the progenitor of the Rx world, but now being in the minority, it's also correct to go along with the majority.

The reason for changing to Debounce is not just that, but also the existence of ThrottleFirst / ThrottleLast. These take the first or last value in a sampling period, respectively, forming a pair. But (dotnet/reactive's) Throttle behaves entirely differently, so the name Throttle is confusing. Originally, dotnet/reactive lacks ThrottleFirst and only has Sample, which corresponds to ThrottleLast, so it's fine. However, if adopting ThrottleFirst/ThrottleLast, inevitably, the name must be Debounce.

Regarding Sample, due to the symmetry in the names and functions of First/Last, it was renamed to ThrottleLast. In dotnet/reactive, since First doesn't exist, Sample would have been fine, but if adopting ThrottleFirst, the name inevitably becomes ThrottleLast.

There is a compromise to keep the name Sample and make it an alias for ThrottleLast (as is the case with RxJava), but having different names for the same function confuses users. There are quite a few questions like what's the difference between sample and throttleLast? Rx is complicated enough, and to avoid unnecessary confusion, aliases should definitely be avoided. Aliases like mapping Select to Map or Where to Filter are utterly foolish.

Default Scheduler for Platforms

In dotnet/reactive, the default scheduler is almost fixed. Technically, it’s possible to replace some behaviors by appropriately implementing IPlatformEnlightenmentProvider or IConcurrencyAbstractionLayer, but it's unnecessarily complicated and mostly hidden with [EditorBrowsable(EditorBrowsableState.Never)], so it's hardly expected to be used properly.

However, for Timer or Delay, if it’s WPF, they operate on DispatcherTimer, and in Unity, they work on Timer in the PlayerLoop, automatically dispatching to the main thread, which is convenient and advantageous for performance as ObserveOn becomes unnecessary in most cases.

In R3, we made it simple to replace the default TimeProvider/FrameProvider.

public static class ObservableSystem
{
public static TimeProvider DefaultTimeProvider { get; set; } = TimeProvider.System;
public static FrameProvider DefaultFrameProvider { get; set; } = new NotSupportedFrameProvider();
}

By replacing them at application startup, the best scheduler for that application will be used by default.

// For example, in WPF, the Dispatcher series is set, so it automatically returns to the UI thread
public static class WpfProviderInitializer
{
public static void SetDefaultObservableSystem(Action<Exception> unhandledExceptionHandler)
{
ObservableSystem.RegisterUnhandledExceptionHandler(unhandledExceptionHandler);
ObservableSystem.DefaultTimeProvider = new WpfDispatcherTimerProvider();
ObservableSystem.DefaultFrameProvider = new WpfRenderingFrameProvider();
}
}

// In the case of Unity, PlayerLoop-based ones are used, avoiding ThreadPool
public static class UnityProviderInitializer
{
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.AfterAssembliesLoaded)]
public static void SetDefaultObservableSystem()
{
SetDefaultObservableSystem(static ex => UnityEngine.Debug.LogException(ex));
}

public static void SetDefaultObservableSystem(Action<Exception> unhandledExceptionHandler)
{
ObservableSystem.RegisterUnhandledExceptionHandler(unhandledExceptionHandler);
ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
}
}

dotnet/reactive’s inability to change the default scheduler hardly supports a variety of platforms.

internal static class SchedulerDefaults
{
internal static IScheduler ConstantTimeOperations => ImmediateScheduler.Instance;
internal static IScheduler TailRecursion => ImmediateScheduler.Instance;
internal static IScheduler Iteration => CurrentThreadScheduler.Instance;
internal static IScheduler TimeBasedOperations => DefaultScheduler.Instance;
internal static IScheduler AsyncConversions => DefaultScheduler.Instance;
}

Especially in AOT scenarios(NativeAOT, Unity IL2CPP) or web publishing (WebGL, WASM), there are situations where ThreadPool cannot be used and must be absolutely avoided. Thus, SchedulerDefaults.TimeBasedOperations being essentially fixed to ThreadPoolScheduler is regrettably restrictive.

Pull IAsyncEnumerable vs Push Observable

IAsyncEnumerable (or UniTask's IUniTaskAsyncEnumerable) is a pull-based asynchronous sequence. Reactive Extensions (Rx) is a push-based asynchronous sequence. They are similar. The fact that you can do LINQ-like operations with both is also similar. It's natural to say that which one to use depends on the case, but then, what are those cases? When should you use which? It would be nice to have some criteria for this decision.

Basically, if there’s a buffer (queue) behind the scenes, pull-based approaches seem suitable, so for network-related scenarios, it might be a good idea to use IAsyncEnumerable. Indeed, natural opportunities to use it come up with System.IO.Pipelines or System.Threading.Channels.

The place to use Rx is indeed related to events.

The deciding factor on which to use should be to choose the representation that is natural for the source. Raw events, such as OnMove or OnClick, are entirely push-based, with no buffer involved. It would also be suitable for high-frequency events like sensor data, or for events that come through the network where the buffer is hidden and delivered purely as events. This means Rx is the natural choice to handle them.

You could interpose a queue and deal with it via IAsyncEnumerable, but that would be unnatural. Alternatively, expressing the intentional dropping of values by not using a queue could also be done, but again, that’s unnatural. Being unnatural usually means worse performance and less clarity. In other words, it’s not good. Therefore, handle event-related things with Rx. With R3, integration with async/await allows you to explicitly specify buffering during asynchronous operations or dropping values with operators. This is clear and performs well. Let’s use R3.

Conclusion

I’ve pointed out many things, I have nothing but gratitude for the original creators of Rx.NET. Once again, I am in awe of the brilliance of the Rx concept and the organized functionality of its various operators. Although some parts of the implementation have become outdated, it has a track record, stability, and high quality. I have been using it from the very beginning and have been enthusiastic about it. I also want to thank the current maintainers. It’s very difficult to maintain a widely used library in an ever-changing environment.

However, I wanted to revive the value of Rx. And if it was to be rebuilt, I believed I was the only one who could do it. I know the history and implementation of Rx from the beginning, have implemented Rx itself (UniRx), and through its widespread use, have become familiar with many use cases and issues. I’ve also been involved on the application side of Rx in large-scale implementations for game titles and implemented a custom runtime for async/await (UniTask) that has also been widely used, giving me insight into all aspects of this area. I have also accumulated experience in implementing high-performance serializers that have become standards in the industry, such as MessagePack for C# and MemoryPack, developing network frameworks like MagicOnion and applying new protocols (HTTP/2, gRPC), and in implementing modern high-performance C# in various areas with ZLogger and AlterNats. I have a sufficient technical foundation.

It’s fine for there to be multiple futures, so I hope you will see R3 as one possible future for Rx that I present. There may be another evolution and future for dotnet/reactive.

With that said, I believe R3 has shown enough potential and possibility to be considered a replacement. I have tried my best to consider migration scenarios as well, so please give it a try…!

--

--

Yoshifumi Kawai
Yoshifumi Kawai

Written by Yoshifumi Kawai

a.k.a. neuecc. Creator of UniRx, UniTask, MessagePack for C#, MagicOnion etc. Microsoft MVP for C#. CEO/CTO of Cysharp Inc. Live and work in Tokyo, Japan.

Responses (3)