using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;
using System.Runtime.Serialization;
var src = new ReplaySubject<int>();
var srcBuffered = ObservableExtensions.BufferWhile(src, (value, eventValue) => true, TimeSpan.FromMilliseconds(10), 1, false);
src.Subscribe(e => Console.WriteLine("Src: " + e));
srcBuffered.Subscribe(e => {
Console.WriteLine("SrcBuffered: (" + i + ") " + e2);
srcBuffered.Subscribe(e => {
Console.WriteLine("SrcBuffered2: (" + i + ") " + e2);
srcBuffered.Subscribe(e => {
Console.WriteLine("SrcBuffered3: (" + i + ") " + e2);
public static class ObservableExtensions
public static IObservable<IList<T>> BufferWhile<T>(
IObservable<T> observable,
Func<T, T, bool> continueComparer,
int maxsize = int.MaxValue,
var lastValue = Prodrive.Maybe<T>.None;
var comparerTick = new Subject<int>();
var timerTick = new Subject<int>();
var countTick = new Subject<int>();
var throttledTimer = timerTick.Do(x => timerTicked = true).Throttle(timeout);
var boundaries = comparerTick.Merge(throttledTimer).Merge(countTick);
Action<T> produceBoundaries =
if (!timerTicked || sliding)
if (lastValue.Match(x => !continueComparer(x, value), () => false))
Action<IList<T>> resetTimer = list => timerTicked = false;
Action<IList<T>> resetCount = list => count = 0;
public struct Maybe<T> : IEquatable<Maybe<T>>
private readonly T value;
private readonly bool isSome;
this.isSome = (object) value != null;
public static Maybe<T> None
throw new InvalidOperationException("Attempted to retrieve the value of None.");
public static implicit operator Maybe<T>(T value)
return Maybe<T>.EliminateNull(value);
public static bool operator ==(Maybe<T> i1, Maybe<T> i2)
public static bool operator !=(Maybe<T> i1, Maybe<T> i2)
public void Match(Action<T> some, Action none)
public U Match<U>(Func<T, U> some, Func<U> none)
return !this.isSome ? none() : some(this.Value);
public U Match<U>(Func<T, U> some, U none)
return !this.isSome ? none : some(this.Value);
public Maybe<U> Bind<U>(Func<T, Maybe<U>> func)
return !this.isSome ? Maybe<U>.None : func(this.Value);
public U Fold<U>(Func<T, U, U> func, U seed)
return !this.isSome ? seed : func(this.Value, seed);
public bool Equals(Maybe<T> other)
if (this.isSome ^ other.isSome)
return this.IsNone && other.IsNone || object.Equals((object) this.Value, (object) other.Value);
public override bool Equals(object obj)
var other = (Maybe<T>)obj;
return this.Equals(other);
public override int GetHashCode()
return !this.isSome ? 0 : this.value.GetHashCode();
public override string ToString()
return !this.isSome ? string.Format("None<{0}>", (object) typeof (T).Name) : string.Format("Some<{0}> {1}", (object) typeof (T).Name, (object) this.value);
public Maybe<T> Where(Func<T, bool> predicate)
return (this.isSome ? (predicate(this.value) ? 1 : 0) : 0) != 0 ? new Maybe<T>(this.value) : Maybe<T>.None;
public Maybe<U> Select<U>(Func<T, U> selector)
return !this.isSome ? Maybe<U>.None : new Maybe<U>(selector(this.value));
public Maybe<V> SelectMany<U, V>(Func<T, Maybe<U>> selector, Func<T, U, V> resultSelector)
Maybe<U> maybe = selector(obj);
return !maybe.isSome ? Maybe<V>.None : new Maybe<V>(resultSelector(obj, maybe.value));
public IEnumerable<T> AsEnumerable()
internal static Maybe<T> EliminateNull(T value)
return (object) value == null ? Maybe<T>.None : new Maybe<T>(value);