using System.Collections.Generic;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Diagnostics;
public static class Program
public static async Task Main()
const int bufferTimeSpan = 1000;
const int bufferCount = 4;
IAsyncEnumerable<IList<int>> query = Enumerable.Range(1, 10)
int delay = Random.Shared.Next(0, 6) * 100;
Print($"Task #{x} started, Delay: {delay}");
Print($"Task #{x} finished");
.Buffer(TimeSpan.FromMilliseconds(bufferTimeSpan), bufferCount);
Print($"Buffer(TimeSpan.FromMilliseconds({bufferTimeSpan}), count: {bufferCount})");
Print($"Before await foreach");
List<int[]> results = new();
await foreach (IList<int> batch in query)
results.Add(batch.ToArray());
Print($"--Received: [{String.Join(", ", batch)}]");
Print($"Results: {String.Join(", ", results.Select(b => $"[{String.Join(", ", b)}]"))}");
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
ArgumentNullException.ThrowIfNull(source);
if (timeSpan < TimeSpan.FromMilliseconds(1.0))
throw new ArgumentOutOfRangeException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
using CancellationTokenSource linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
PeriodicTimer timer = null;
return timer.WaitForNextTickAsync().AsTask();
IAsyncEnumerator<TSource> enumerator = source
.GetAsyncEnumerator(linkedCts.Token);
Task<bool> moveNext = null;
List<TSource> buffer = new();
TSource[] ConsumeBuffer()
TSource[] array = buffer.ToArray();
if (buffer.Capacity > count) buffer.Capacity = count;
Task<bool> timerTickTask = StartTimer();
if (timerTickTask.IsCompleted)
Debug.Assert(timerTickTask.Result);
yield return ConsumeBuffer();
timerTickTask = StartTimer();
moveNext = enumerator.MoveNextAsync().AsTask();
if (!moveNext.IsCompleted)
Task completedTask = await Task.WhenAny(moveNext, timerTickTask)
if (ReferenceEquals(completedTask, timerTickTask))
Debug.Assert(timerTickTask.IsCompleted);
Debug.Assert(timerTickTask.Result);
yield return ConsumeBuffer();
timerTickTask = StartTimer();
Debug.Assert(moveNext.IsCompleted);
bool moved = await moveNext.ConfigureAwait(false);
TSource item = enumerator.Current;
if (buffer.Count == count)
yield return ConsumeBuffer();
timerTickTask = StartTimer();
if (buffer.Count > 0) yield return ConsumeBuffer();
try { linkedCts.Cancel(); }
if (moveNext is not null && !moveNext.IsCompleted)
await Task.WhenAny(moveNext).ConfigureAwait(false);
await enumerator.DisposeAsync().ConfigureAwait(false);
private static void Print(object value)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread.ManagedThreadId}] > {value}");