using System.Reactive.Linq;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static async Task Main()
using var subscription = Observable.Range(0, 55).Subscribe(p.InputObserver);
const int BATCHSIZE = 10;
private readonly TransformBlock<int, Item> _idToItemBlock;
private readonly ActionBlock<IEnumerable<Item>> _itemProcessor;
var executionBlockOptions = new ExecutionDataflowBlockOptions {
BoundedCapacity = BATCHSIZE
_idToItemBlock = new TransformBlock<int, Item>(IdToItemAsync, executionBlockOptions);
var itemBatcher = new BatchBlock<Item>(BATCHSIZE);
_itemProcessor = new ActionBlock<IEnumerable<Item>>(ProcessEmailsAsync, executionBlockOptions);
_idToItemBlock.LinkTo(itemBatcher, new DataflowLinkOptions { PropagateCompletion = true });
itemBatcher.LinkTo(_itemProcessor, new DataflowLinkOptions { PropagateCompletion = true });
private static async Task<Item> IdToItemAsync(int id)
private static async Task ProcessEmailsAsync(IEnumerable<Item> items)
var itemsList = items.ToList();
Console.WriteLine($"Processing Batch of Size {itemsList.Count}");
foreach ( var item in itemsList )
Console.WriteLine($"Processed {item}");
public IObserver<int> InputObserver => _idToItemBlock.AsObserver();
public Task Completion() => _itemProcessor.Completion;
public record Item( int Id );