using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
public static void Main()
}).GetAwaiter().GetResult();
Console.WriteLine($"{DateTime.UtcNow}: We are done.");
private static Random _rand = new Random();
public static TimeSpan HeartbeatInterval => TimeSpan.FromMilliseconds(_rand.Next(100,1000));
private static async Task RunMyCodeAsync()
var _cts = new CancellationTokenSource();
.Defer(() => Observable.FromAsync(_ => PublishHeartBeatAsync(_cts.Token)))
.Delay(_ => Observable.Timer(HeartbeatInterval))
.TakeUntil(Observable.Create<Unit>(obs => _cts.Token.Register(() => obs.OnNext(Unit.Default))))
() => Console.WriteLine($"{DateTime.UtcNow}: Observable completed."
private static IObservable<Unit> GetHeartbeatObservable(CancellationToken ct) =>
Observable.Create<Unit>(observer =>
async Task<IDisposable> RecursiveHeartbeatPublish(IScheduler scheduler, int _)
await PublishHeartBeatAsync(ct)
if (ct.IsCancellationRequested)
$"Unspecified error occurred in {nameof(PublishHeartBeatAsync)}."
Console.WriteLine($"{DateTime.UtcNow}: Throwing from RecursiveHeartbeatPublish...");
scheduler.Schedule(0, TimeSpan.FromSeconds(1), RecursiveHeartbeatPublish);
var scheduler = Scheduler.Default;
return scheduler.Schedule(0, RecursiveHeartbeatPublish);
public static async Task PublishHeartBeatAsync(CancellationToken ct)
Console.WriteLine($"{DateTime.UtcNow.ToString("HH:mm:ss.fff")}: Hi from PublishHeartBeatAsync");