using System.Threading.Tasks;
using System.Reactive.Linq;
public static class Program
public static void Main()
.Interval(TimeSpan.FromMilliseconds(200))
.Do(x => Console.WriteLine($"Produced #{x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"--Result: {x}"))
Console.WriteLine($"Finished");
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
.Scan((Task: Task.FromResult<TResult>(default), Id: 0), (previous, item) =>
!previous.Task.IsCompleted ? previous : (function(item), unchecked(previous.Id + 1)))