using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
private BufferBlock<int> _queue;
private ActionBlock<int> _consumer;
public Action<int, string> OnResult;
private void InitializeChain()
_queue = new BufferBlock<int>(new DataflowBlockOptions{BoundedCapacity = 5});
var consumerOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = 5, MaxDegreeOfParallelism = 5};
_consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);
_queue.LinkTo(_consumer, new DataflowLinkOptions{PropagateCompletion = true});
private async Task SomeIoWorkAsync(int x)
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
OnResult?.Invoke(x, $"SomeResult {x}");
public async Task AddAsync(int data)
await _queue.SendAsync(data);
public static Task<string> WaitForConsumerAsync(Producer producer, int myId)
var tcs = new TaskCompletionSource<string>();
producer.OnResult += (id, result) =>
tcs.TrySetResult(result);
public static async Task Main()
var producer = new Producer();
var task = WaitForConsumerAsync(producer, myId);
#pragma warning disable 4014
for (int i = 1; i <= 20; i++)
await producer.AddAsync(i);
#pragma warning restore 4014
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");