using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading.Tasks;
public class DelayQueue<T> : IDisposable
private readonly ConcurrentDictionary<long, ImmutableList<T>> _delayedItems = new();
private readonly Action<T> _enqueueAction;
private readonly Timer _timer;
private readonly CancellationTokenSource _cancellationTokenSource = new();
public DelayQueue(Action<T> enqueueAction, TimeSpan? processInterval = null)
_enqueueAction = enqueueAction ?? throw new ArgumentNullException(nameof(enqueueAction));
_timer = new Timer(ProcessQueue, null, TimeSpan.Zero, processInterval ?? TimeSpan.FromSeconds(1));
public void Enqueue(T item, TimeSpan delay)
var executeAtTicks = DateTime.UtcNow.Add(delay).Ticks;
_delayedItems.AddOrUpdate(
_ => ImmutableList.Create(item),
(_, existingList) => existingList.Add(item)
private void ProcessQueue(object state)
var nowTicks = DateTime.UtcNow.Ticks;
foreach (var key in _delayedItems.Keys)
if (key <= nowTicks && _delayedItems.TryRemove(key, out var items))
foreach (var item in items)
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();