using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Runtime.ExceptionServices;
public static void Main()
var xs = CreateObservableEx<Unit>(async (o, ct) =>
await Task.Delay(10, ct);
o.OnError(new Exception());
xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
xs.DefaultIfEmpty().Wait();
public static IObservable<TSource> CreateObservableEx<TSource>(
Func<IObserver<TSource>, CancellationToken, Task> subscribe)
return Observable.Create<TSource>(async (observer, cancellationToken) =>
Action<Exception> propagateError = (Exception error) =>
try { observer.OnError(error); }
ThreadPool.QueueUserWorkItem(_ => ExceptionDispatchInfo.Capture(ex).Throw());
var innerObserver = Observer.Create<TSource>(observer.OnNext,
propagateError, observer.OnCompleted);
try { await subscribe(innerObserver, cancellationToken); }