using System.Threading.Tasks;
using System.Threading.Channels;
using System.Diagnostics;
public static void Main()
Channel<int> channel = new DoubleCapacityChannel<int>(10, 5);
Task producer = Task.Run(async () =>
Stopwatch stopwatch = Stopwatch.StartNew();
Connection connection = new();
while (stopwatch.ElapsedMilliseconds < 1000)
await channel.Writer.WaitToWriteAsync();
Print($"Opening connection -->");
item = connection.GetNextItem();
Print($"Produced #{item}");
if (!channel.Writer.TryWrite(item)) break;
Print($"Closing connection <--");
await channel.Writer.WriteAsync(item);
Print($"Producer completed");
channel.Writer.Complete();
} catch (Exception ex) { channel.Writer.Complete(ex); }
Task consumer = Task.Run(async () =>
await foreach (var item in channel.Reader.ReadAllAsync())
Print($"Consuming: {item}");
Task.WaitAll(producer, consumer);
public void Open() => Thread.Sleep(100);
public void Close() => Thread.Sleep(100);
public int GetNextItem() => ++_index;
public sealed class DoubleCapacityChannel<T> : Channel<T>
private readonly Channel<T> _channel;
private readonly Channel<int> _channelLow;
private readonly int _lowCapacity;
public DoubleCapacityChannel(int highCapacity, int lowCapacity)
throw new ArgumentOutOfRangeException(nameof(highCapacity));
if (lowCapacity < 1 || lowCapacity > highCapacity)
throw new ArgumentOutOfRangeException(nameof(lowCapacity));
_lowCapacity = lowCapacity;
_channel = Channel.CreateBounded<T>(highCapacity);
Debug.Assert(_channel.Reader.CanCount);
_channelLow = Channel.CreateBounded<int>(1);
this.Writer = new ChannelWriter(this);
this.Reader = new ChannelReader(this);
private class ChannelWriter : ChannelWriter<T>
private readonly DoubleCapacityChannel<T> _parent;
public ChannelWriter(DoubleCapacityChannel<T> parent) => _parent = parent;
public override bool TryComplete(Exception error = null)
bool success = _parent._channel.Writer.TryComplete(error);
if (success) _parent._channelLow.Writer.TryComplete(error);
public override bool TryWrite(T item)
bool success = _parent._channel.Writer.TryWrite(item);
if (!success || _parent._channel.Reader.Count >= _parent._lowCapacity)
_parent._channelLow.Writer.TryWrite(0);
public override async ValueTask WriteAsync(T item,
CancellationToken cancellationToken = default)
cancellationToken.ThrowIfCancellationRequested();
if (this.TryWrite(item)) break;
if (await _parent._channel.Writer.WaitToWriteAsync(
cancellationToken).ConfigureAwait(false)) continue;
catch (Exception ex) { throw new ChannelClosedException(ex); }
throw new ChannelClosedException();
public override ValueTask<bool> WaitToWriteAsync(
CancellationToken cancellationToken = default)
=> _parent._channelLow.Writer.WaitToWriteAsync(cancellationToken);
private class ChannelReader : ChannelReader<T>
private readonly DoubleCapacityChannel<T> _parent;
public ChannelReader(DoubleCapacityChannel<T> parent) => _parent = parent;
public override Task Completion => _parent._channel.Reader.Completion;
public override bool CanCount => _parent._channel.Reader.CanCount;
public override int Count => _parent._channel.Reader.Count;
public override bool TryRead(out T item)
bool success = _parent._channel.Reader.TryRead(out item);
if (!success || _parent._channel.Reader.Count < _parent._lowCapacity)
_parent._channelLow.Reader.TryRead(out _);
public override ValueTask<T> ReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.ReadAsync(cancellationToken);
public override ValueTask<bool> WaitToReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.WaitToReadAsync(cancellationToken);
public override bool CanPeek => _parent._channel.Reader.CanPeek;
public override bool TryPeek(out T item)
=> _parent._channel.Reader.TryPeek(out item);
private static void Print(object value)
Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
.ManagedThreadId}] > {value}");