using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
WarmObservable.WithMissesFrom<Int32>,
WarmObservable.WithDuplicatesFrom,
WarmObservable.WithCorrectButHighMemoryUsageFrom,
foreach (var strategy in strategies)
var observed = new List<Int32>();
var nums = new ObservableAddOnlyCollection<Int32>();
var warm = strategy(nums, nums.Added);
const Int32 max = 1_000_000;
Parallel.For(0, max, i =>
warm.Subscribe(observed.Add);
Console.WriteLine($"{observed.Count:N0} elements observed using {strategy.Method.Name}.");
public sealed class ObservableAddOnlyCollection<T> : IObservable<T>
private readonly List<T> items = new();
private event Action<T> added = _ => {};
public IObservable<T> Added => Observable.FromEvent<T>(h => added += h, h => added -= h);
public void Add(T item) { lock (items) items.Add(item); added.Invoke(item); }
public IDisposable Subscribe(IObserver<T> observer) => ToList().ToObservable().Subscribe(observer);
private List<T> ToList() { lock (items) return items.ToList(); }
public static class WarmObservable
public static IObservable<T> WithMissesFrom<T>(IObservable<T> cold, IObservable<T> hot)
public static IObservable<T> WithDuplicatesFrom<T>(IObservable<T> cold, IObservable<T> hot)
public static IObservable<T> WithCorrectButHighMemoryUsageFrom<T>(IObservable<T> cold, IObservable<T> hot)
return hot.Merge(cold).Distinct();
public static IObservable<T> HotFirst<T>(IObservable<T> cold, IObservable<T> hot)
return Observable.Merge([hot, cold], 2);
public static IObservable<T> ColdFirst<T>(IObservable<T> cold, IObservable<T> hot)
return Observable.Merge([cold, hot], 2);