using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Reactive.Threading.Tasks;
private static void WriteLine(string path)
public static async Task<int> VerySlowOperation(int i, TimeSpan timeout)
WriteLine($"Slow operation for {i} - before wait.");
await Task.Delay(timeout);
WriteLine($"Slow operation for {i} - after wait.");
public static async Task<int> VeryQuickOperation(int i)
WriteLine($"Quick operation for {i} - after wait.");
public static async Task Main()
WriteLine("Starting tests...");
var list = new List<int> { 1, 2, 3 };
var observable = list.ToObservable();
var resultObservable = observable
.Select(x => VerySlowOperation(x, TimeSpan.FromMilliseconds((3 - x) * 100)).ToObservable())
.Select(x => VeryQuickOperation(x).ToObservable())
.Merge(maxConcurrent: 1);
var semaphore = new SemaphoreSlim(0);
void ProcessResult(int result)
WriteLine($"Result is : {result}");
resultObservable.Subscribe(ProcessResult, () => semaphore.Release());
await semaphore.WaitAsync(5000);