using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
public static void Main()
Console.WriteLine("Hello World");
FailedMessageHandler fmh = new FailedMessageHandler( new Progress<string[]>((list) => { Console.WriteLine("Handling {0} messages. [{1}]", list.Length, string.Join(",", list));}));
Parallel.For(0,52, (i) => {fmh.Add(string.Format("Test {0,3}",i));});
var result = Parallel.For(53,107, (i) => {fmh.Add(string.Format("Test {0,3}",i));});
while(!result.IsCompleted)
public class FailedMessageHandler
private BlockingCollection<string> workQueue = new BlockingCollection<string>();
private List<string> currentBuffer = new List<string>(10);
private IProgress<string[]> progress;
private Thread workThread;
public FailedMessageHandler( IProgress<string[]> progress )
this.progress = progress;
workThread = new Thread(WatchDog);
public void Add( string failedMessage )
if ( workQueue.IsAddingCompleted )
throw new InvalidOperationException("Adding is completed!");
workQueue.Add(failedMessage);
CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(1));
var failedMsg = workQueue.Take(timeout.Token);
currentBuffer.Add(failedMsg);
if( currentBuffer.Count >= 10 ){
progress.Report(currentBuffer.ToArray());
catch(OperationCanceledException)
Console.WriteLine("TIMEOUT!");
if( currentBuffer.Any() )
progress.Report(currentBuffer.ToArray());
catch(InvalidOperationException)
Console.WriteLine("COMPLETED!");
if( currentBuffer.Any() )
progress.Report(currentBuffer.ToArray());
Console.WriteLine("DONE!");
public void CompleteAdding()
workQueue.CompleteAdding();
public void AwaitCompletion()