using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Program
static async Task Main(string[] args)
var producer = CreateDeadTimeTransformBlock<int, string>(async (item, deadTime) =>
ConsolePrint($"Processing #{item}/A Started");
ConsolePrint($"Processing #{item}/A Finished");
await deadTime.MinimumDelayAsync(TimeSpan.FromSeconds(1));
ConsolePrint($"Processing #{item}/B Started");
ConsolePrint($"Processing #{item}/B Finished");
}, new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = 2
var consumer = new ActionBlock<string>(item =>
ConsolePrint($"Received: {item}");
producer.LinkTo(consumer, new DataflowLinkOptions() { PropagateCompletion = true });
foreach (var item in Enumerable.Range(1, 5))
await producer.SendAsync(item);
await consumer.Completion;
ConsolePrint($"Finished");
private readonly SemaphoreSlim _semaphore;
private readonly CancellationToken _cancellationToken;
public DeadTime(SemaphoreSlim semaphore, CancellationToken cancellationToken)
_cancellationToken = cancellationToken;
public async Task MinimumDelayAsync(TimeSpan timeSpan)
await Task.Delay(timeSpan, _cancellationToken);
await _semaphore.WaitAsync();
public static TransformBlock<TInput, TOutput>
CreateDeadTimeTransformBlock<TInput, TOutput>(
Func<TInput, DeadTime, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions)
var cancellationToken = dataflowBlockOptions.CancellationToken;
var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
var taskScheduler = dataflowBlockOptions.TaskScheduler;
if (maxDOP == DataflowBlockOptions.Unbounded)
semaphore = new SemaphoreSlim(Int32.MaxValue);
semaphore = new SemaphoreSlim(maxDOP, maxDOP);
dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
taskScheduler, maxDOP).ConcurrentScheduler;
var deadTime = new DeadTime(semaphore, cancellationToken);
var block = new TransformBlock<TInput, TOutput>(async item =>
await semaphore.WaitAsync(cancellationToken);
return await transform(item, deadTime);
}, dataflowBlockOptions);
dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP;
dataflowBlockOptions.TaskScheduler = taskScheduler;
public static void ConsolePrint(string line)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread.ManagedThreadId}] > {line}");