using System.Threading.Tasks;
using System.Collections.Generic;
public static async Task Main()
var queue = new FakeQueue();
var processors = new Dictionary<string, IDeepQueueMessageProcessor>
[DeepQueueOperation.Foo] = new FooOperationDeepQueueMessageProcessor(),
[DeepQueueOperation.Bar] = new BarOperationDeepQueueMessageProcessor()
var queueHandler = new FooBarDeepQueueMessageHandler(processors);
var cts = new CancellationTokenSource();
while (queue.TryGetMessage(out var message))
Console.WriteLine($"Got a message! {message}");
var context = new DeepQueueContext(message);
context = await queueHandler.HandleAsync(context, cts.Token);
Console.WriteLine($"Message was handled? {context.Response.IsHandled}");
Console.WriteLine("Done");
private Queue<string> _messages = new Queue<string>();
Operation = DeepQueueOperation.Foo,
CorrelationId = Guid.NewGuid(),
Operation = DeepQueueOperation.Bar,
Operation = DeepQueueOperation.Bar,
Operation = DeepQueueOperation.Foo,
CorrelationId = Guid.NewGuid(),
_messages.Enqueue(JsonSerializer.Serialize(message1));
_messages.Enqueue(JsonSerializer.Serialize(message2));
_messages.Enqueue(JsonSerializer.Serialize(message3));
_messages.Enqueue(JsonSerializer.Serialize(message4));
public bool TryGetMessage(out string message)
return _messages.TryDequeue(out message);
public interface IDeepQueueMessageHandler
Task<DeepQueueContext> HandleAsync(DeepQueueContext context, CancellationToken token = default);
public class FooBarDeepQueueMessageHandler : IDeepQueueMessageHandler
private readonly Dictionary<string, IDeepQueueMessageProcessor> _processors;
public FooBarDeepQueueMessageHandler(Dictionary<string, IDeepQueueMessageProcessor> processors)
_processors = processors;
public async Task<DeepQueueContext> HandleAsync(DeepQueueContext context, CancellationToken token)
context = context.PrepareForProcessing();
var processor = FindProcessorForOperation(context.Operation);
context = processor.EnsureContextRequestFor(context);
context.Response = await processor.ProcessAsync(context.Request);
private IDeepQueueMessageProcessor FindProcessorForOperation(string operation)
if (string.IsNullOrWhiteSpace(operation)) throw new ArgumentNullException(nameof(operation));
if (!_processors.ContainsKey(operation)) throw new ArgumentOutOfRangeException(nameof(operation));
return _processors[operation];
public class DeepQueueContext
public IDeepQueueOperationRequest Request { get; set; }
public IDeepQueueOperationResponse Response { get; set; }
public string RawJsonMessage { get; private set; }
public string Operation { get; private set; }
public string PayloadJson { get; private set; }
public DeepQueueContext(string rawMessageJson)
RawJsonMessage = rawMessageJson;
public DeepQueueContext PrepareForProcessing()
var jsonDoc = JsonDocument.Parse(RawJsonMessage, new JsonDocumentOptions { AllowTrailingCommas = true });
Operation = jsonDoc.RootElement.GetProperty("Operation").GetString();
PayloadJson = jsonDoc.RootElement.GetProperty("Payload").ToString();
public interface IDeepQueueMessageProcessor
string OperationName { get; }
Task<IDeepQueueOperationResponse> ProcessAsync(IDeepQueueOperationRequest request, CancellationToken token = default);
DeepQueueContext EnsureContextRequestFor(DeepQueueContext context);
public interface IDeepQueueOperationRequest
public interface IDeepQueueOperationResponse
bool IsHandled { get; set; }
public class FooDeepQueueOperationRequest : IDeepQueueOperationRequest
public string Data { get; private set; }
public FooDeepQueueOperationRequest(string data)
public override string ToString()
return JsonSerializer.Serialize(this);
public abstract class DeepQueueOperationResponseBase : IDeepQueueOperationResponse
public bool IsHandled { get; set; }
public DeepQueueOperationResponseBase()
public class FooDeepQueueOperationResponse : DeepQueueOperationResponseBase
public FooDeepQueueOperationResponse(bool handled)
public FooDeepQueueOperationResponse() : this(false)
public class BarDeepQueueOperationRequest : IDeepQueueOperationRequest
public long Number { get; private set; }
public BarDeepQueueOperationRequest(long number)
public override string ToString()
return JsonSerializer.Serialize(this);
public class BarDeepQueueOperationResponse : DeepQueueOperationResponseBase
public BarDeepQueueOperationResponse(bool handled)
public BarDeepQueueOperationResponse() : this(false)
public class FooOperationDeepQueueMessageProcessor : IDeepQueueMessageProcessor
public string OperationName => DeepQueueOperation.Foo;
public async Task<IDeepQueueOperationResponse> ProcessAsync(IDeepQueueOperationRequest request, CancellationToken token)
var fooRequest = request as FooDeepQueueOperationRequest;
Console.WriteLine($"Processing Foo Operation for {fooRequest}");
Console.WriteLine($"The foo data is \"{fooRequest.Data}\"");
return new FooDeepQueueOperationResponse(!string.IsNullOrWhiteSpace(fooRequest.Data));
public DeepQueueContext EnsureContextRequestFor(DeepQueueContext context)
var jsonDoc = JsonDocument.Parse(context.PayloadJson, new JsonDocumentOptions { AllowTrailingCommas = true });
var fooData = jsonDoc.RootElement.GetProperty("FooData").GetString();
context.Request = new FooDeepQueueOperationRequest(fooData);
public class BarOperationDeepQueueMessageProcessor : IDeepQueueMessageProcessor
public string OperationName => DeepQueueOperation.Bar;
public async Task<IDeepQueueOperationResponse> ProcessAsync(IDeepQueueOperationRequest request, CancellationToken token)
var barRequest = request as BarDeepQueueOperationRequest;
Console.WriteLine($"Processing Bar Operation for {barRequest}");
Console.WriteLine($"The bar number is {barRequest.Number}");
return new BarDeepQueueOperationResponse(barRequest.Number > -1);
public DeepQueueContext EnsureContextRequestFor(DeepQueueContext context)
var jsonDoc = JsonDocument.Parse(context.PayloadJson, new JsonDocumentOptions { AllowTrailingCommas = true });
var number = jsonDoc.RootElement.GetProperty("BarNum").GetInt64();
context.Request = new BarDeepQueueOperationRequest(number);
public static class DeepQueueOperation
public const string Foo = "ApplyFoo";
public const string Bar = "ApplyBar";