using System.Collections.Generic;
public static void Main()
new[]{1,2,3,4,5}.ToObservable().Map(_=>_*2).Subscribe(new ConsoleObserver<int>());
public interface IObserver<in T>
void OnError(Exception ex);
public interface IObservable<out T>
IDisposable Subscribe(IObserver<T> observer);
public class EmptyDisposable : IDisposable
public class ConsoleObserver<T> : IObserver<T>
Console.WriteLine("completed");
public void OnError(Exception ex)
Console.WriteLine($"Error: {ex.Message}");
public void OnNext(T data)
Console.WriteLine($"Data: {data}");
public class Observable<T> : IObservable<T>
Func<IObserver<T>, IDisposable> subscribeDelegate;
public Observable(Func<IObserver<T>, IDisposable> subscribeDelegate)
this.subscribeDelegate = subscribeDelegate;
public IDisposable Subscribe(IObserver<T> observer)
return subscribeDelegate(observer);
public class ToObservable<T> : IObservable<T>
public ToObservable(IEnumerable<T> data)
public IDisposable Subscribe(IObserver<T> observer)
foreach (var item in data)
return new EmptyDisposable();
public class MapObservable<T> : IObservable<T>, IObserver<T>
public MapObservable(IObservable<T> source, Func<T, T> selector)
public void OnError(Exception ex)
public void OnNext(T data)
_observer.OnNext(_selector(data));
public IDisposable Subscribe(IObserver<T> observer)
return _source.Subscribe(this);
public static class ObservableExpressions
public static IObservable<T> ToObservable<T>(this IEnumerable<T> src)
return new ToObservable<T>(src);
public static IObservable<T> Map<T>(this IObservable<T> src, Func<T, T> selector)
return new MapObservable<T>(source: src, selector: selector);