using System.Threading.Tasks;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using static System.Console;
static async Task ThrottleWithScheduler()
WriteLine($"\n{nameof(ThrottleWithScheduler)}\n");
var sc = new CustomSyncContext();
var scheduler = new SynchronizationContextScheduler(sc);
var subj = new BehaviorSubject<string>("A");
.Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
.Throttle(TimeSpan.FromMilliseconds(500), scheduler)
.Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));
static async Task ThrottleWithObserveOn()
WriteLine($"\n{nameof(ThrottleWithObserveOn)}\n");
var sc = new CustomSyncContext();
var scheduler = new SynchronizationContextScheduler(sc);
var subj = new BehaviorSubject<string>("A");
.Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));
public static async Task Main()
await ThrottleWithScheduler();
await ThrottleWithObserveOn();
class CustomSyncContext : SynchronizationContext
private readonly Stopwatch _sw = Stopwatch.StartNew();
public long Elapsed { get { lock (_sw) { return _sw.ElapsedMilliseconds; } } }
public override void Post(SendOrPostCallback d, object? state)
WriteLine($"Scheduled on {Elapsed}ms");
Task.Delay(100).ContinueWith(
WriteLine($"Executed on {Elapsed}ms");
continuationOptions: TaskContinuationOptions.ExecuteSynchronously);