using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
var result = await Test();
foreach (var item in result.Data)
Console.WriteLine($"Error {result.Error.Message}");
static async Task<Result<IEnumerable<Details>>> Test()
var client = new IBClient();
var results = new ReplaySubject<Details>();
var endSignal = client.DetailsEnd
.Any(x => x.RequestId == requestId);
var errorSignal = client.Error
.Where(x => x.RequestId == requestId)
.Subscribe(x => results.OnError(new Exception($"Code: {x.Code}, Message: {x.Message}")));
.Where(x => x.RequestId == requestId)
.Timeout(TimeSpan.FromSeconds(2))
.Catch<Details, TimeoutException>(ex =>
Console.WriteLine("Called");
return Observable.Empty<Details>();
.ObserveOn(TaskPoolScheduler.Default)
client.Emit(requestId, 42);
client.Emit(requestId, 62);
client.Emit(requestId, 123);
client.EmitEnd(requestId);
client.Emit(requestId, 1);
var items = results.ToEnumerable();
return Result<IEnumerable<Details>>.FromSuccess(items);
public record Details(int RequestId, int Value);
public record DetailsEnd(int RequestId);
public record ErrorHappened(int RequestId, int Code, string Message);
private readonly Subject<ErrorHappened> _errorSubject = new();
public IObservable<ErrorHappened> Error => _errorSubject.AsObservable();
private readonly Subject<Details> _subjectDetailsSubject = new();
public IObservable<Details> Details => _subjectDetailsSubject.AsObservable();
private readonly Subject<DetailsEnd> _subjectDetailsEndSubject = new();
public IObservable<DetailsEnd> DetailsEnd => _subjectDetailsEndSubject.AsObservable();
public void Emit(int requestId, int value) => _subjectDetailsSubject.OnNext(new Details(requestId, value));
public void EmitEnd(int requestId) => _subjectDetailsEndSubject.OnNext(new DetailsEnd(requestId));
public void EmitError(int requestId, int code, string message) => _errorSubject.OnNext(new ErrorHappened(requestId, code, message));
public record Error(int? Code, string Message, object? Data);
[property: MemberNotNullWhen(true, nameof(Result<T>.Data))]
[property: MemberNotNullWhen(false, nameof(Result<T>.Error))]
bool Success, T? Data, Error? Error)
public static Result<T> FromSuccess(T data) => new(true, data, default);
public static Result<T> FromError<TError>(TError error) where TError : Error => new(false, default, error);