using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public static void Main()
var obs = new Subject<int>();
.Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
.Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
.ObserveOn(ThreadPoolScheduler.Instance)
.Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
.SelectMany(x => Test(x).ToEnumerable())
.Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
.Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
var nextpub = publications.FirstAsync();
static string ShowContent<T>(IEnumerable<T> e) {
return "[" + string.Join(", ", e.Select(x => x.ToString())) + "]";
static IObservable<Unit> Test(IList<int> _) {
Console.WriteLine("> thread: {0}", Thread.CurrentThread.ManagedThreadId);
return Observable.Return(Unit.Default);