struct Grain<T> where T : class {
public Exception Failure;
class Channel<T> where T : class {
public Channel(int bufferSize) { throw new Exception("not implemented"); }
public bool Send(Grain<T> grain) { throw new Exception("not implemented"); }
public Grain<T> Receive() { throw new Exception("not implemented"); }
public void Close() { throw new Exception("not implemented"); }
public void Wait() { throw new Exception("not implemented"); }
static void Start<T>(Channel<T> dst, int threadsNumber, Func<Grain<T>> provider) where T : class {
int threadsLeft = threadsNumber;
for (var i = 0; i < threadsNumber; ++i) {
if (output.Data == null && output.Failure == null) {
if (Interlocked.Decrement(ref threadsLeft) == 0)
if (!dst.Send(output) || output.Failure != null)
static void Pipe<I, O>(Channel<I> src, Channel<O> dst, int threadsNumber, Func<I, Grain<O>> processor)
where I : class where O : class {
int threadsLeft = threadsNumber;
for (var i = 0; i < threadsNumber; ++i) {
var input = src.Receive();
if (input.Failure != null) {
dst.Send(new Grain<O>() {Data = null, Failure = input.Failure});
} else if (input.Data == null) {
if (Interlocked.Decrement(ref threadsLeft) == 0)
var output = processor(input.Data);
if (!dst.Send(output) || output.Failure != null) {
static void End<T>(Channel<T> src, int threadsNumber, Action<T> consumer) where T : class {
int threadsLeft = threadsNumber;
for (var i = 0; i < threadsNumber; ++i) {
var input = src.Receive();
public static void Main()
var reader2compressors = new Channel<Bytes>(compressors * 2);
var compressors2writer = new Channel<Bytes>(compressors * 2);
Func<Grain<Bytes>> reader = () => {
throw new Exception("not implemented");
Func<Bytes, Grain<Bytes>> compressor = (input) => {
throw new Exception("not implemented");
Action<Bytes> writer = (intput) => {
throw new Exception("not implemented");
Start(reader2compressors, 1 , reader);
Pipe(reader2compressors, compressors2writer, compressors, compressor);
End(compressors2writer, 1 , writer);
compressors2writer.Wait();