using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static async Task Main()
var xlsBlock = new TransformBlock<int, int>(document =>
int duration = 300 + document % 3 * 300;
var pdfBlock = new TransformBlock<int, int>(document =>
int duration = 100 + document % 5 * 200;
var orderRestorer = CreateRestoreOrderBlock<int>(
indexSelector: document => document, startingIndex: 1L);
var uploader = new ActionBlock<int>(async document =>
Console.WriteLine("{0:HH:mm:ss.fff} Uploading document #{1}", DateTime.Now, document);
xlsBlock.LinkTo(orderRestorer);
pdfBlock.LinkTo(orderRestorer);
orderRestorer.LinkTo(uploader, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var document in Enumerable.Range(1, 10))
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => orderRestorer.Complete());
await uploader.Completion;
public static IPropagatorBlock<T, T> CreateRestoreOrderBlock<T>(
Func<T, long> indexSelector,
DataflowBlockOptions options = null)
if (indexSelector == null) throw new ArgumentNullException(nameof(indexSelector));
var executionOptions = new ExecutionDataflowBlockOptions();
executionOptions.CancellationToken = options.CancellationToken;
executionOptions.BoundedCapacity = options.BoundedCapacity;
executionOptions.EnsureOrdered = options.EnsureOrdered;
executionOptions.TaskScheduler = options.TaskScheduler;
executionOptions.MaxMessagesPerTask = options.MaxMessagesPerTask;
executionOptions.NameFormat = options.NameFormat;
var buffer = new Dictionary<long, T>();
long minIndex = startingIndex;
IEnumerable<T> Transform(T item)
long index = indexSelector(item);
if (index < startingIndex)
throw new InvalidOperationException($"Index {index} is out of range.");
throw new InvalidOperationException($"Index {index} has been consumed.");
if (buffer.ContainsKey(index))
throw new InvalidOperationException($"Index {index} is not unique.");
while (buffer.TryGetValue(minIndex, out var minItem))
return new TransformManyBlock<T, T>(Transform, executionOptions);