using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
public static void Main()
var backgroundScheduler = new NewThreadScheduler();
var consoleInputObservable = ConsoleInputObservable(backgroundScheduler).Publish().RefCount();
var downloadInitiatedObservable = consoleInputObservable.Where(x => x == "o");
var downloadEngagedObservable = downloadInitiatedObservable.Select(x => Unit.Default);
var downloadDisengagedObservable = consoleInputObservable.Where(x => x == "c")
.Select(x => Unit.Default);
var downloadWindow = consoleInputObservable.Window(downloadEngagedObservable, x => downloadDisengagedObservable)
var timeoutObservable = downloadInitiatedObservable.Select(x => "timeout " + x).Repeat(1);
var duplicatedAgentInstancesParametersObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => "g1 " + x).Take(10).Concat(Observable.Never<string>());
var withLatestFrom = duplicatedAgentInstancesParametersObservable.WithLatestFrom(downloadInitiatedObservable, (f, e) => $"f: {f} e:{e}");
var eventsFeed = withLatestFrom
.Window(downloadEngagedObservable, x => downloadDisengagedObservable)
var timeoutTrigger = eventsFeed.Throttle(TimeSpan.FromSeconds(3));
var subscribe = eventsFeed
.Timeout(TimeSpan.FromSeconds(3), Observable.Return("TIMEOUT").Window(downloadEngagedObservable, x => downloadDisengagedObservable).Switch())