using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Concurrent;
public static void Main(string[] args){
var eventStream = new EventStream();
List<IDisposable> disposables = new();
disposables.Add(eventStream);
Console.WriteLine("Press enter to exit");
Console.WriteLine("Cleaning up");
foreach(var d in disposables){
public record StreamEvent(int Payload);
public class EventStream: IObservable<StreamEvent>, IDisposable {
private int _nextSubscriptionId = 1;
private IDictionary<int, EventStreamSubscription> _subscriptions = new ConcurrentDictionary<int, EventStreamSubscription>();
private CancellationTokenSource _cancellationTokenSource;
private int _payload = 1;
_cancellationTokenSource = new CancellationTokenSource();
Console.WriteLine("Creating the event stream");
var token = _cancellationTokenSource.Token;
Console.WriteLine("reading from stream");
await Task.Delay(TimeSpan.FromMilliseconds(500), token);
token.ThrowIfCancellationRequested();
var evt = new StreamEvent(_payload++);
private void PublishEvent(StreamEvent evt){
Console.WriteLine("Publishing event: " + evt.Payload);
foreach(var subscription in _subscriptions.Values){
subscription.Observer.OnNext(evt);
public IDisposable Subscribe(IObserver<StreamEvent> observer){
var subscription = new EventStreamSubscription(_nextSubscriptionId++, observer, RemoveSubscription);
_subscriptions.Add(subscription.SubscriptionId, subscription);
private void RemoveSubscription(int subscriptionId){
_subscriptions.Remove(subscriptionId);
Console.WriteLine("Disposing the event stream");
_cancellationTokenSource.Cancel();
public class EventStreamSubscription : IDisposable {
private IObserver<StreamEvent> _observer;
private int _subscriptionId;
private Action<int> _disposeCallback;
public EventStreamSubscription(int subscriptionId, IObserver<StreamEvent> observer, Action<int> disposeCallback){
_subscriptionId = subscriptionId;
_disposeCallback = disposeCallback;
public int SubscriptionId { get => _subscriptionId; }
public IObserver<StreamEvent> Observer { get => _observer; }
protected void Dispose(bool disposing){
if(_disposeCallback != null){
_disposeCallback(_subscriptionId);
~EventStreamSubscription(){