using System.Reactive.Linq;
public interface IMessage {
public interface ITextMessage: IMessage {
TextReader Reader { get; }
public interface IBinaryMessage: IMessage {
public static class SumTypeHelper {
public static IObservable<TResult> Switch<TResult>(
this IObservable<IMessage> messages,
Func<ITextMessage, TResult> textMessage,
Func<IBinaryMessage, TResult> binaryMessage)
=> messages.Select(m => {
return binaryMessage(bm);
throw new NotSupportedException($"Message type: {m?.GetType()} not supported");
public static class KeepAliveHelper {
public static IObservable<TSource> KeepAlive<TSource>(
this IObservable<TSource> src,
Func<TSource, IObservable<TSource>> keepalives)
=> src.Publish(shareable => {
var latestKeepAlive = shareable.Select(keepalives)
shareable.IgnoreElements()
.Append(default(TSource))
return src.Merge(latestKeepAlive);
public static IObservable<TSource> KeepAlive<TSource>(
this IObservable<TSource> src,
TimeSpan interval, TSource msg)
Observable.Interval(interval)
.DelaySubscription(interval));
public static class MessagesAsTextHelper {
public static IObservable<TextReader> TextReader(this IObservable<IMessage> messages)
=> messages.Switch(tm => tm.Reader, bm => new StreamReader(bm.Stream, System.Text.Encoding.UTF8));
public static IObservable<TResult> Read<TResult>(
Func<Memory<char>, TResult> f,
Func<int, Memory<char>> rent,
Action<Memory<char>> @return)
=> Observable.Create(async (obs, ct) => {
public static class Util2 {
public IObservable<TextReader> ContentChunkedParser(IObservable<TextReader> reader) {
public static void Main()
Console.WriteLine("Hello World");