Skip to content

Instantly share code, notes, and snippets.

@jnm2
Created August 1, 2025 22:43
Show Gist options
  • Save jnm2/77250e364f8d817b336b7c7d34fc6d1d to your computer and use it in GitHub Desktop.
Save jnm2/77250e364f8d817b336b7c7d34fc6d1d to your computer and use it in GitHub Desktop.
internal static class RxExtensions
{
/// <summary>
/// <para>
/// Fixes an issue when awaiting Rx.NET observables when <see cref="SynchronizationContext.Current"/> is <see
/// langword="null"/>, where the <see langword="await"/> resumes without disposing resources in the observables.
/// Those resources are not disposed until the program suspends at a later unrelated <see langword="await"/>.
/// </para>
/// <para>
/// This indefinite delay in disposing resources happens because Rx.NET forwards OnCompleted and OnError to the
/// outer observer before disposing the subscription to the inner observer.¹ When forwarding OnCompleted/OnError to
/// <see cref="System.Reactive.Subjects.AsyncSubject{T}"/>, its internal <c>AwaitObserver</c> either schedules the
/// continuation of your <see langword="await"/> using <see cref="SynchronizationContext.Post"/>, or runs your
/// continuation inline if <see cref="SynchronizationContext.Current"/> is <see langword="null"/>.² When your
/// continuation runs inline, <see cref="System.Reactive.Subjects.AsyncSubject{T}"/>'s
/// <c>AwaitObserver.OnCompleted</c>/<c>OnError</c> do not return until your program hits a later unrelated <see
/// langword="await"/> which suspends. This causes the forwarding of OnCompleted/OnError to block while they run
/// your program's continuation. When your program suspends at a later unrelated <see langword="await"/>, the
/// OnCompleted/OnError call returns and the disposal happens.¹
/// </para>
/// <para>
/// Footnotes:
/// <list type="number">
/// <item><see href="https://github.com/dotnet/reactive/blob/rxnet-v6.0.1/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs#L54-L55"/></item>
/// <item><see href="https://github.com/dotnet/reactive/blob/rxnet-v6.0.1/Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs#L380-L404"/></item>
/// </list>
/// </para>
/// </summary>
public static IObservable<T> PrioritizeDisposal<T>(this IObservable<T> source)
{
return new PrioritizeDisposalObservable<T>(source);
}
private sealed class PrioritizeDisposalObservable<T>(IObservable<T> source) : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer)
{
var wrapper = new Observer(observer);
var subscription = source.Subscribe(wrapper);
wrapper.ProvideSubscription(subscription);
return subscription;
}
private sealed class Observer(IObserver<T> observer) : IObserver<T>
{
private readonly TaskCompletionSource<IDisposable> subscriptionSource = new();
public void ProvideSubscription(IDisposable subscription)
{
subscriptionSource.SetResult(subscription);
}
public void OnCompleted()
{
subscriptionSource.Task.ContinueWith(task =>
{
task.Result.Dispose();
observer.OnCompleted();
});
}
public void OnError(Exception error)
{
subscriptionSource.Task.ContinueWith(task =>
{
task.Result.Dispose();
observer.OnError(error);
});
}
public void OnNext(T value)
{
observer.OnNext(value);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment