using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
public static class Program
public static async Task Main()
.GroupBy(@object => @object.SSId)
.SelectMany(group => group.Buffer(producers.Length)));
.Select(x => x.Where(y => y.ProducerId == 1));
private static void WriteToConsole(IList<Product> syncedProducts)
Console.WriteLine($"Synced {syncedProducts.Count} producers on SSID {syncedProducts.First().SSId}");
private static Random _rnd = new Random();
public int ProducerId { get; set; }
public long SSId { get; set; }
private static IObservable<IObservable<Product>[]> _connectedProducersEvent;
private static IObservable<IObservable<Product>[]> ConnectedProducersEvent
if (_connectedProducersEvent == null)
_connectedProducersEvent =
.Interval(TimeSpan.FromMilliseconds(300))
.Do(x => Console.WriteLine($"***** ConnectedProducersEvent! Count: {x} *****"))
.Interval(TimeSpan.FromMilliseconds(_rnd.Next(1, 100)))
.Select(ssid => new Product { ProducerId = producerId, SSId = ssid })
.Do(x => Console.WriteLine($"P{x.ProducerId}: SSId:{x.SSId}"))
return _connectedProducersEvent;