using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
public static void Main()
CtsTest ctsTest = new CtsTest();
for (int i = 0; i < 200; i++) ctsTest.QueueJob();
Task.Run(() => ctsTest.ProcessQueueAsync(CancellationToken.None));
public class ProcessDescriptor
public Task Task { get; set; }
public JobDescriptor Descriptor { get; set; }
public CancellationTokenSource Cts { get; set; }
public class JobDescriptor
public string ProcessId { get; set; }
private List<ProcessDescriptor> _processQueue = new List<ProcessDescriptor>();
private ConcurrentDictionary<string, JobDescriptor> _waitQueue = new ConcurrentDictionary<string, JobDescriptor>();
public async Task ProcessQueueAsync(CancellationToken ct)
while (!ct.IsCancellationRequested)
foreach (var process in _processQueue.ToList())
if (process.Task.Status == TaskStatus.RanToCompletion)
_processQueue.Remove(process);
var runningProcesses = _processQueue.Select(f => f.Descriptor.ProcessId).Distinct();
var jobsToRun = _waitQueue.Where(f => !runningProcesses.Contains(f.Key))
.Take(20 - _processQueue.Count).ToList();
foreach (var job in jobsToRun)
var runProcess = new ProcessDescriptor()
Cts = new CancellationTokenSource(TimeSpan.FromMinutes(5))
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct, runProcess.Cts.Token);
runProcess.Task = RunJob1(job.Value, linkedCts.Token);
_processQueue.Add(runProcess);
Console.WriteLine("Worker stopped");
private async Task RunJob1(JobDescriptor descriptor, CancellationToken cancellationToken)
await Task.Delay(3000, cancellationToken);
Console.WriteLine("Job done: " + descriptor.ProcessId);
_waitQueue.TryRemove(descriptor.ProcessId, out descriptor);
private async Task RunJob2(JobDescriptor descriptor, CancellationTokenSource cancellationTokenSource)
await Task.Delay(3000, cancellationTokenSource.Token);
Console.WriteLine("Job done: " + descriptor.ProcessId);
if (_waitQueue.TryRemove(descriptor.ProcessId, out descriptor))
cancellationTokenSource.Dispose();
string processId = Guid.NewGuid().ToString();
JobDescriptor item = new JobDescriptor();
item.ProcessId = processId;
_waitQueue.AddOrUpdate(processId, item, (s, updaterDescriptor) => item);
Console.WriteLine("Job added to queue: " + processId);