using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public static async Task Main()
Faker<ThePipeline.Settings> settingsFaker = new Faker<ThePipeline.Settings>()
.CustomInstantiator(f => new ThePipeline.Settings(settingsIds++, f.Name.FullName(), f.Random.Bool()));
var p = new ThePipeline();
using var goodSub = p.Good.Subscribe(s => Console.WriteLine("[GOOD] : {0}", s));
using var badSub = p.Bad.Subscribe(s => Console.WriteLine("[BAD] : {0}", s));
using var errSub = p.Error.Subscribe(s => Console.WriteLine("[ERROR] : {0}", s));
using var pipeSub = Observable.Range(0, 100).Select(_ => settingsFaker.Generate()).Subscribe(p.Input);
private TransformBlock<Settings, StageResult> _stage1;
private ExecutionDataflowBlockOptions _stage1Options = new ExecutionDataflowBlockOptions(){
MaxDegreeOfParallelism = 1
private TransformBlock<StageResult, StageResult> _stage2;
private ExecutionDataflowBlockOptions _stage2Options = new ExecutionDataflowBlockOptions(){
MaxDegreeOfParallelism = 2
private TransformBlock<StageResult, string> _theGood;
private TransformBlock<StageResult, string> _theBad;
private TransformBlock<StageResult, string> _theUgly;
var linkOptions = new DataflowLinkOptions()
PropagateCompletion = true
_theGood = new TransformBlock<StageResult, string>(o => o.ToString());
_theBad = new TransformBlock<StageResult, string>(o => o.ToString());
_theUgly = new TransformBlock<StageResult, string>(o => o.ToString());
_stage1 = new TransformBlock<Settings, StageResult>(StageOneAsync, _stage1Options);
_stage2 = new TransformBlock<StageResult, StageResult>(StageTwoAsync, _stage2Options);
_stage1.LinkTo(_theBad, linkOptions, r => r is StageBadResult);
_stage1.LinkTo(_theUgly, linkOptions, r => r is StageUglyResult);
_stage2.LinkTo(_theBad, linkOptions, r => r is StageBadResult);
_stage2.LinkTo(_theUgly, linkOptions, r => r is StageUglyResult);
_stage1.LinkTo(_stage2, linkOptions, r => r is StageGoodResult);
_stage2.LinkTo(_theGood, linkOptions, r => r is StageGoodResult);
public Task Completion => Task.WhenAll(_theGood.Completion, _theBad.Completion, _theUgly.Completion);
public IObserver<Settings> Input => _stage1.AsObserver();
public IObservable<string> Good => _theGood.AsObservable();
public IObservable<string> Bad => _theBad.AsObservable();
public IObservable<string> Error => _theUgly.AsObservable();
private async Task<StageResult> StageOneAsync(Settings s)
return await RandomizeReturn(s, 1);
return new StageUglyResult(s, 1, ex.Message );
private async Task<StageResult> StageTwoAsync(StageResult s)
return await RandomizeReturn(s.Settings, 2);
return new StageUglyResult(s.Settings, 2, ex.Message );
private Task<StageResult> RandomizeReturn(Settings s, int stage)
return Random.Shared.Next(100) switch
< 10 => throw new ApplicationException("Naw, that's ugly!"),
< 30 => Task.FromResult(new StageBadResult(s, stage) as StageResult),
_ => Task.FromResult(new StageGoodResult(s, stage) as StageResult)
public record Settings(int Id, string Name, bool IsSomething );
private record StageResult ( Settings Settings, int Stage );
private record StageGoodResult ( Settings Settings, int Stage ) : StageResult ( Settings, Stage );
private record StageBadResult ( Settings Settings, int Stage ) : StageResult ( Settings, Stage );
private record StageUglyResult ( Settings Settings, int Stage, string Error ) : StageResult ( Settings, Stage );