using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
namespace ConsoleApplication10
public static void Main(string[] args)
var userId = await GetUserId("Peter");
var invoices = ReadInvoicesFromDb(userId).Where(invoice => invoice.Total == 0)
.WhereAsync(invoice => IsInvoiceProcessed(invoice.Id));
await invoices.ForEachAsync2(async invoice => {
Console.WriteLine("Processing invoice: " + invoice.Id);
await Task.Delay(TimeSpan.FromMilliseconds(500));
Console.WriteLine("Processed invoice: " + invoice.Id);
Console.WriteLine("Exception");
static IObservable<Invoice> ReadInvoicesFromDb(string userId)
return Observable.Create<Invoice>(async obs => {
var batch = await Task.FromResult(Enumerable.Range(0, 10).Select(i => new Invoice() { Id = i }).ToList());
foreach (var invoice in batch) {
Console.WriteLine("OnNext " + inv.Id);
await Task.Delay(TimeSpan.FromMilliseconds(50));
await Task.Delay(TimeSpan.FromMilliseconds(500));
Console.WriteLine("OnCompleted");
private static Task ProcessInvoice(Invoice invoice)
return Task.FromResult(true);
private static Task UpdateInvoiceTotal(int id, int total)
return Task.FromResult(true);
private static int CalculateTotal(Invoice invoice)
Thread.Sleep(50 * invoice.Id);
private static async Task<bool> IsInvoiceProcessed(int id)
await Task.Delay(TimeSpan.FromMilliseconds(100));
private static Task<string> GetUserId(object name)
return Task.FromResult("Pan");
public decimal Total { get; set; }
public int Id { get; set; }
static class RxExtensions
public static IObservable<T> WhereAsync<T>(this IObservable<T> source, Func<T, Task<bool>> asyncPredicate)
return source.SelectMany(item => asyncPredicate(item)
public static Task ForEachAsync2<T>(this IObservable<T> source, Func<T, Task> handler)
return source.Select(item => Observable.FromAsync(() => handler(item)))