using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
using System.Diagnostics;
async Task Test(CancellationToken token)
var publishedSequence = Observable
.Interval(TimeSpan.FromMilliseconds(50))
.Do(n => Console.WriteLine($"Emitting: {n}"))
observer => token.Register(
Console.WriteLine($"Cancelling");
observer.OnError(new OperationCanceledException(token));
.Do(n => Console.WriteLine($"TakeUntil passed: {n}"))
.Finally(() => Console.WriteLine($"Finally"))
using var subscription = publishedSequence.Subscribe(
onNext: n => Console.WriteLine($"OnNext: {n}"),
onError: e => Console.WriteLine($"OnError: {e}"),
onCompleted: () => Console.WriteLine("OnCompleted"));
using var connection = publishedSequence.Connect();
await publishedSequence.ToTask();
ThreadPool.SetMaxThreads(workerThreads: 100, completionPortThreads: 100);
var cts = new CancellationTokenSource(1000);
public static class RxExt
public static IObservable<T> WithCancellation<T>(this IObservable<T> @this, CancellationToken cancelToken)
if (cancelToken.IsCancellationRequested)
return Observable.Throw<T>(new OperationCanceledException(cancelToken));
if (!cancelToken.CanBeCanceled)
return Observable.Never<T>();
return Observable.Create<T>(observer =>
IDisposable subscription = Disposable.Empty;
IDisposable rego = Disposable.Empty;
subscription = @this.Subscribe(OnNext, OnError, OnCompleted);
rego = cancelToken.Register(OnCancel, null);
return Disposable.Create(Dispose);
void OnCancel(object? _, CancellationToken token)
Debug.Assert(Monitor.IsEntered(syncRoot));
if (!subscribed) { return; }
if (!cancelToken.IsCancellationRequested)
void Cancel(CancellationToken token)
Debug.Assert(Monitor.IsEntered(syncRoot));
if (!subscribed || cancelled) { return; }
observer.OnError(new OperationCanceledException(token));
if (cancelToken.IsCancellationRequested)
void OnError(Exception error)
if (cancelToken.IsCancellationRequested)
if (cancelToken.IsCancellationRequested)