using System.Threading.Tasks;
using System.Reactive.Linq;
using System.Collections.Generic;
using System.Reactive.Subjects;
public static void Main()
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " beginning");
var subject = new BehaviorSubject<long>(0);
subject.MagicBuffer(TimeSpan.FromMilliseconds(200)).Subscribe(x => Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " Received " + string.Join(",", x)));
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 1,2,3");
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " should emit empty in a bit");
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 4");
public static IObservable<IList<T>> MagicBuffer<T>(this IObservable<T> source, TimeSpan delay)
return Observable.Create<IList<T>>(o => {
DateTime lastEmitTime = DateTime.MinValue;
DateTime lastCallTime = DateTime.MinValue;
IList<T> values = new List<T>();
Task continuation = Task.CompletedTask;
Action<Task> callback = _ => {
if(DateTime.Now - lastCallTime >= delay){
lastCallTime = DateTime.Now;
Action<Task> recur = null;
recur = _ => Task.Delay(delay).ContinueWith(ignored => {
continuation = continuation.ContinueWith(callback).ContinueWith(recur);
if(DateTime.Now - lastEmitTime >= delay){
lastEmitTime = lastCallTime = DateTime.Now;
o.OnNext(new List<T>{v});