using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
public static void Main()
List<Channel<int>> channels = new();
async Task CreateConsumer(Func<Channel<int>, Task> body)
var channel = Channel.CreateUnbounded<int>();
lock (channels) channels.Add(channel);
await Task.Run(() => body(channel)).ConfigureAwait(false);
lock (channels) channels.Remove(channel);
Task consumer1 = CreateConsumer(async channel =>
await foreach (var i in channel.Reader.ReadAllAsync())
Console.WriteLine($"Consumer one: {i}");
Task consumer2 = CreateConsumer(async channel =>
await foreach (var i in channel.Reader.ReadAllAsync())
Console.WriteLine($"Consumer two: {i}");
using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
catch (OperationCanceledException) { break; }
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(consumer1, consumer2);