using System.Threading.Channels;
using System.Threading.Tasks;
public long Ticks { get; set; }
public DateTime DateTime { get; set; }
public decimal Price { get; set; }
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
_cancellationTokenSource = cancellationTokenSource;
public async Task StartConsuming()
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
public Producer(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource)
_cancellationTokenSource = cancellationTokenSource;
public async Task StartProducing()
for (int i = 0; i < 10; i++)
await _tickQueue.Writer.WriteAsync(new DummyData()
Ticks = DateTime.Now.Ticks,
Price = Convert.ToDecimal(r.NextDouble() * r.Next(100, 105))
}, _cancellationTokenSource.Token);
await Task.Delay(r.Next(50, 500));
internal class MultipleConsumersEg
private static Channel<DummyData> tickQueue;
private static readonly CancellationTokenSource TickQueueCancellationTokenSource = new CancellationTokenSource();
public static async Task Main(string[] args)
tickQueue = Channel.CreateUnbounded<DummyData>();
Producer p = new Producer(tickQueue, TickQueueCancellationTokenSource);
ConsumerOne consumerOne = new ConsumerOne(tickQueue, TickQueueCancellationTokenSource, "ONE");
consumerOne.StartConsuming();