using System.Reactive.Subjects;
using System.Threading.Tasks;
using System.Reactive.Linq;
var source = new Subject<int>();
var loader = new ReactiveLoader<int>(source, Load);
loader.IsLoadingObservable.Subscribe(val => Console.WriteLine($"Is Loading: {val}"));
Console.WriteLine("Program Finished");
static async Task Load(int id, CancellationToken ct)
Console.WriteLine($"Loading started: {id}");
await Task.Delay(100, ct);
Console.WriteLine($"Loading finished: {id}");
catch (TaskCanceledException)
Console.WriteLine($"Loading cancelled: {id}");
public class ReactiveLoader<T> : IDisposable
private readonly BehaviorSubject<bool> _isLoading = new(false);
private readonly Subject<Unit> _completes = new();
private readonly Subject<T> _reloads = new ();
private readonly IDisposable _subscription;
public bool IsLoading => _isLoading.Value;
public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged();
public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
_subscription = observable
_completes.OnNext(Unit.Default);
.Do(_ => _isLoading.OnNext(true))
.Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
.Do(_ => _isLoading.OnNext(false))
_completes.OnNext(Unit.Default);