using System.Collections.Generic;
using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Disposables;
using System.Reactive.Concurrency;
using Microsoft.Reactive.Testing;
public class ObservableTests : ReactiveTest
private static IObservable<string> CreateObservable(AsyncOperationWorker worker, IScheduler scheduler)
return Observable.Create<string>(observer =>
var schedulingIsCompleted = new TaskCompletionSource<bool>();
scheduler.Schedule<object>(
var task = Task.Run(async () => await worker.ExecuteAsync());
catch (AggregateException ex)
foreach (var innerException in ex.InnerExceptions)
if (innerException is OperationCanceledException)
schedulingIsCompleted.TrySetResult(true);
observer.OnError(innerException);
schedulingIsCompleted.TrySetResult(true);
observer.OnNext("From async");
() => observer.OnCompleted());
private class AsyncOperationWorker
private readonly CancellationToken _token;
private readonly Exception _simulatedException;
public AsyncOperationWorker(CancellationToken token, Exception simulatedException = null)
_simulatedException = simulatedException;
public async Task ExecuteAsync()
if (_simulatedException != null)
throw _simulatedException;
_token.ThrowIfCancellationRequested();
public void Async_schedule_completes_after_sync()
var testScheduler = new TestScheduler();
var testObserver = testScheduler.CreateObserver<string>();
var cts = new CancellationTokenSource();
var worker = new AsyncOperationWorker(cts.Token);
var source = CreateObservable(worker, testScheduler);
source.Subscribe(testObserver);
var expected = new[] { OnCompleted<string>(1) };
Assert.That(testObserver.Messages, Is.EqualTo(expected).Using(new NotificationComparer()));
public void Should_contain_OnError_when_worker_throws_exception()
var testScheduler = new TestScheduler();
var testObserver = testScheduler.CreateObserver<string>();
var exception = new InvalidOperationException("Test failure");
var worker = new AsyncOperationWorker(CancellationToken.None, exception);
var source = CreateObservable(worker, testScheduler);
source.Subscribe(testObserver);
OnError<string>(1, exception)
Assert.That(testObserver.Messages, Is.EqualTo(expected).Using(new NotificationComparer()));
public void Should_contain_OnNext_when_operation_succeeds()
var testScheduler = new TestScheduler();
var testObserver = testScheduler.CreateObserver<string>();
var worker = new AsyncOperationWorker(CancellationToken.None);
var source = CreateObservable(worker, testScheduler);
source.Subscribe(testObserver);
OnCompleted<string>(30_000_000)
Assert.That(testObserver.Messages, Is.EqualTo(expected).Using(new NotificationComparer()));
class NotificationComparer : IEqualityComparer<Recorded<Notification<string>>>
public bool Equals(Recorded<Notification<string>> x, Recorded<Notification<string>> y)
if (x.Value.Kind != y.Value.Kind || x.Time != y.Time)
case NotificationKind.OnNext:
return x.Value.Value.Equals(y.Value.Value);
case NotificationKind.OnError:
var xEx = x.Value.Exception;
var yEx = y.Value.Exception;
return xEx?.GetType() == yEx?.GetType() &&
xEx?.Message == yEx?.Message;
case NotificationKind.OnCompleted:
public int GetHashCode(Recorded<Notification<string>> obj)
return HashCode.Combine(obj.Time, obj.Value.Kind);
public static IDisposable Schedule(this IScheduler scheduler, TimeSpan dueTime, Action<Action<TimeSpan>> action)
throw new ArgumentNullException(nameof(scheduler));
throw new ArgumentNullException(nameof(action));
return scheduler.Schedule(action, dueTime, (a, self) => a(ts => self(a, ts)));
public static void Main()
new NUnitLite.AutoRun().Execute(["--noc" ]);