using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Runtime.CompilerServices;
using System.Diagnostics;
public static class Program
public static async Task Main()
IAsyncEnumerable<int> query = Enumerable.Range(1, 10)
int delay = Random.Shared.Next(1, 11) * 50;
Print($"Task #{x} started, Delay: {delay}");
Print($"Task #{x} finished");
.AwaitResults(maxConcurrency: 2);
List<int> results = new();
Print($"Before await foreach");
await foreach (int item in query)
Print($"--Received: {item}");
Print($"Results: {String.Join(", ", results)}");
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
ArgumentNullException.ThrowIfNull(source);
throw new ArgumentOutOfRangeException(nameof(maxConcurrency));
if (capacity == -1) capacity = Int32.MaxValue;
throw new ArgumentOutOfRangeException(nameof(capacity));
Channel<Task<TResult>> channel = Channel
.CreateBounded<Task<TResult>>(capacity);
using SemaphoreSlim semaphore = new(maxConcurrency - 1, maxConcurrency);
using CancellationTokenSource linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task producer = Task.Run(async () =>
await foreach (Task<TResult> task in source
.WithCancellation(linkedCts.Token).ConfigureAwait(false))
await channel.Writer.WriteAsync(OnCompletionRelease(task))
await semaphore.WaitAsync(linkedCts.Token)
catch (ChannelClosedException) { }
finally { channel.Writer.TryComplete(); }
async Task<TResult> OnCompletionRelease(Task<TResult> task)
try { return await task.ConfigureAwait(false); }
finally { semaphore.Release(); }
await foreach (Task<TResult> task in channel.Reader.ReadAllAsync()
yield return await task.ConfigureAwait(false);
await producer.ConfigureAwait(false);
try { linkedCts.Cancel(); }
if (!producer.IsCompleted)
channel.Writer.TryComplete();
await Task.WhenAny(producer).ConfigureAwait(false);
if (!channel.Reader.Completion.IsCompleted)
while (channel.Reader.TryRead(out Task<TResult> task))
await Task.WhenAny(task).ConfigureAwait(false);
Debug.Assert(channel.Reader.Completion.IsCompleted);
private static void Print(object value)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread.ManagedThreadId}] > {value}");