using System.Collections.Generic;
using System.Threading.Tasks;
public static async Task Main()
var rateLimiter = new RateLimiter(3, TimeSpan.FromSeconds(3));
var task1 = Task.Run(async () =>
for (var i = 0; i < 5; i++)
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
var task2 = Task.Run(async () =>
for (var i = 0; i < 5; i++)
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
await Task.WhenAll(task1, task2);
private readonly CancellationToken _cancellationToken;
private readonly int _count;
private readonly Queue<DateTime> _completed = new Queue<DateTime>();
private readonly Queue<TaskCompletionSource<bool>> _queue = new Queue<TaskCompletionSource<bool>>();
private readonly object _sync = new object ();
private readonly TimeSpan _timeSpan;
private int _isProcessing;
public RateLimiter(int count, TimeSpan timeSpan, CancellationToken cancellationToken = default)
_cancellationToken = cancellationToken;
while (_cancellationToken.IsCancellationRequested && _queue.Any())
_queue.Dequeue().TrySetCanceled();
Interlocked.Exchange(ref _isProcessing, 0);
private async Task ProcessAsync()
_cancellationToken.ThrowIfCancellationRequested();
var time = DateTime.Now - _timeSpan;
while (_completed.Any() && _completed.Peek() < time)
while (_completed.Count < _count && _queue.Any())
_queue.Dequeue().TrySetResult(true);
_completed.Enqueue(DateTime.Now);
if (!_queue.Any() || !_completed.Any())
delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);
await Task.Delay(delay, _cancellationToken);
catch (OperationCanceledException)
public ValueTask WaitAsync()
_cancellationToken.ThrowIfCancellationRequested();
if (_completed.Count < _count && !_queue.Any())
_completed.Enqueue(DateTime.Now);
var tcs = new TaskCompletionSource<bool>();
return new ValueTask(tcs.Task);
if (Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0)