using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
private static SubscriptionManager<object, string> _manager = new();
private const string Channel = "BTCUSDT";
public static void Main(string[] args)
var token = _manager.Subscribe(Channel, data =>
Console.WriteLine("SUBSCRIPTION 1");
return ValueTask.CompletedTask;
var token2 = _manager.Subscribe(Channel, data =>
Console.WriteLine("SUBSCRIPTION 2");
return ValueTask.CompletedTask;
_manager.Unsubscribe(Channel, token);
Console.WriteLine("SUBSCRIPTION 1 token => {0}", token);
Console.WriteLine("SUBSCRIPTION 2 token => {0}", token2);
public sealed class Subscription<TRequest, TNotification>
where TNotification : class
public Subscription(string channel) : this(channel, null, null) { }
public Subscription(string channel, Func<TNotification, ValueTask> handler) : this(channel, null, handler) { }
public Subscription(string channel, TRequest request, Func<TNotification, ValueTask> handler)
public string Channel { get; }
public TRequest Request { get; }
public Func<TNotification, ValueTask> Handler { get; }
public class SubscriptionToken : IEquatable<SubscriptionToken>
public readonly static SubscriptionToken Empty = new(Guid.Empty);
private readonly Guid _guid;
public Guid Value => _guid;
private SubscriptionToken(Guid guid) => _guid = guid;
private SubscriptionToken() { }
public bool IsValid() => _guid != Guid.Empty;
public static SubscriptionToken New() => new(Guid.NewGuid());
public bool Equals(SubscriptionToken other) => other != null && _guid == other._guid;
public override bool Equals(object obj) => obj is SubscriptionToken token && Equals(obj);
public override int GetHashCode() => HashCode.Combine(_guid);
public override string ToString() => _guid.ToString("N");
public static bool operator ==(SubscriptionToken left, SubscriptionToken right) => EqualityComparer<SubscriptionToken>.Default.Equals(left, right);
public static bool operator !=(SubscriptionToken left, SubscriptionToken right) => !(left == right);
public class SubscriptionCollection<TRequest, TNotification>
where TNotification : class
private readonly ConcurrentDictionary<Guid, Subscription<TRequest, TNotification>> _subscriptions = new();
public bool TryGet(SubscriptionToken token, out Subscription<TRequest, TNotification> subscription)
return _subscriptions.TryGetValue(token.Value, out subscription);
public bool TryAdd(Subscription<TRequest, TNotification> subscription, out SubscriptionToken token)
if (subscription == null) throw new ArgumentNullException(nameof(subscription));
token = SubscriptionToken.New();
if (_subscriptions.TryAdd(token.Value, subscription))
token = SubscriptionToken.Empty;
public bool TryRemove(SubscriptionToken token)
return _subscriptions.TryRemove(token.Value, out _);
public IEnumerable<Subscription<TRequest, TNotification>> GetSubscriptions()
foreach (var subscription in _subscriptions)
if (subscription.Value != null)
yield return subscription.Value;
public IEnumerable<Func<TNotification, ValueTask>> GetHandlers()
foreach(var subscription in _subscriptions)
if(subscription.Value?.Handler != null)
yield return subscription.Value.Handler;
public void Clear() => _subscriptions.Clear();
public sealed class SubscriptionManager<TRequest, TNotification>
where TNotification : class
private readonly ConcurrentDictionary<string, SubscriptionCollection<TRequest, TNotification>> _subscriptions = new();
public SubscriptionToken Subscribe(string channel, Func<TNotification, ValueTask> handler)
if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel));
if(handler == null) throw new ArgumentNullException(nameof(handler));
var subscription = new Subscription<TRequest, TNotification>(channel, handler);
if (!_subscriptions.TryGetValue(channel, out SubscriptionCollection<TRequest, TNotification> subscriptions))
subscriptions = new SubscriptionCollection<TRequest, TNotification>();
if (subscriptions.TryAdd(subscription, out SubscriptionToken token))
_subscriptions.TryAdd(channel, subscriptions);
public bool Unsubscribe(string channel, SubscriptionToken token) => _subscriptions.TryGetValue(channel, out var collection) && collection.TryRemove(token);
public Subscription<TRequest, TNotification> GetSubscription(string channel, SubscriptionToken token)
if (_subscriptions.TryGetValue(channel, out var collection)
&& collection.TryGet(token, out Subscription<TRequest, TNotification> subscription))
public IEnumerable<Subscription<TRequest, TNotification>> GetSubscriptions(string channel)
if (_subscriptions.TryGetValue(channel, out var collection))
return collection.GetSubscriptions();
return Enumerable.Empty<Subscription<TRequest, TNotification>>();
public IEnumerable<Func<TNotification, ValueTask>> GetHandlers(string channel)
if (_subscriptions.TryGetValue(channel, out var collection))
return collection.GetHandlers();
return Enumerable.Empty<Func<TNotification, ValueTask>>();
public void Reset() => _subscriptions.Clear();