using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
private static Random rnd = new Random();
public static async Task Main()
IEngine engine = new Engine();
for( int i = 0; i < 100; i++ )
await Simulate(rnd, engine);
public static int _workItems = 0;
public static async Task Simulate(Random rnd, IEngine engine)
var workItem = new WorkItem( $"WorkItem {_workItems++:000}", TimeSpan.FromMilliseconds(rnd.Next(750,999)));
var isPriority = rnd.Next(100) < 10;
await Task.Delay(TimeSpan.FromMilliseconds(rnd.Next(0,50)));
? engine.ProcessVIPAsync(workItem)
: engine.ProcessAsync(workItem));
public record WorkItem (string Name, TimeSpan Effort);
Task ProcessAsync(WorkItem message);
Task ProcessAsync(WorkItem message);
Task ProcessVIPAsync(WorkItem message);
public interface IEngineBlock
Task ProcessAsync(WorkItem message);
public class Engine : IEngine
private readonly IEngineBlock _normalEngine = new EngineBlock(2);
private readonly IEngineBlock _prioEngine = new EngineBlock(1);
public Task ProcessAsync(WorkItem message)
return _normalEngine.ProcessAsync(message);
public Task ProcessVIPAsync(WorkItem message)
return _prioEngine.ProcessAsync(message);
return Task.WhenAll(_normalEngine.Shutdown(), _prioEngine.Shutdown());
private class EngineBlock : IEngineBlock
private readonly List<ClientWrapper> _clients = new();
private readonly Func<int> _iterator;
public EngineBlock(int clientCount)
for( int i = 0; i < clientCount; i++ ) _clients.Add(new ClientWrapper());
_iterator = clientCount > 1 ? NextIndex : () => 0;
public Task ProcessAsync(WorkItem message)
return NextClient().ProcessAsync(message);
return Task.WhenAll(_clients.Select(x => x.Shutdown()).ToList());
private ClientWrapper NextClient()
return _clients[_iterator()];
private int _currentIndex = 0;
return Interlocked.Exchange(ref _currentIndex, (_currentIndex+1)%_clients.Count );
private class ClientWrapper
private readonly ActionBlock<WorkItem> _actionBlock;
private readonly IClient _client;
_client = new SimulatedClient();
_actionBlock = new ActionBlock<WorkItem>(ProcessAtClientAsync, new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 4 });
private Task ProcessAtClientAsync( WorkItem message ) => _client.ProcessAsync(message);
public Task ProcessAsync(WorkItem message) => _actionBlock.SendAsync(message);
return _actionBlock.Completion;
public class SimulatedClient : IClient
private static int count = 0;
public string Name { get; } = $"Client {++count}";
public async Task ProcessAsync(WorkItem message)
var slots = Interlocked.Decrement(ref _slots);
await Task.Delay(message.Effort);
Console.WriteLine($"{Name} Processed {message.Name} | Slots free: {slots}");
Interlocked.Increment(ref _slots);