using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
CreateObservable(Scheduler.CurrentThread,TimeSpan.FromSeconds(0))
.Catch<string, Exception>(ex =>
Console.WriteLine($"Caught first exception in stream: {ex.Message}");
return Observable.Empty<string>();
var observable = Observable.Create<string>(async (observer, cancellationToken) =>
observer.OnError( new OperationCanceledException());
} catch (AggregateException ex)
{Console.WriteLine("fwe");
foreach (var innerException in ex.InnerExceptions)
if (innerException is OperationCanceledException)
{throw new OperationCanceledException();
observer.OnError(innerException);
return Task.FromResult<string>("");
.Catch<string, Exception>(ex =>
Console.WriteLine($"Caught exception in stream: {ex.Message}");
return Observable.Empty<string>();
var subscription = observable.Subscribe(
onNext: v => Console.WriteLine($"Received: {v}"),
onError: ex => Console.WriteLine($"Error: {ex.Message}"),
onCompleted: () => Console.WriteLine("Completed"));
Console.WriteLine("Disposing subscription (cancelling)...");
private static IObservable<string> CreateObservable(IScheduler scheduler, TimeSpan delay)
return Observable.Create<string>(observer =>
var schedulingIsCompleted = new TaskCompletionSource<bool>();
Func<IScheduler, IDisposable> recursiveSchedule = null;
recursiveSchedule = (sched) =>
observer.OnError(new OperationCanceledException());
scheduler.Schedule(TimeSpan.FromSeconds(3), () => observer.OnCompleted());
return scheduler.Schedule(
(sched, dueTime) => recursiveSchedule(sched)