using System.Collections.Concurrent;
using System.Threading.Tasks;
public class MessageQueue<T>
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public void Enqueue(T message)
public bool TryDequeue(out T message)
return _queue.TryDequeue(out message);
public string Id { get; set; }
public string Source { get; set; }
public string Type { get; set; }
public DateTime Time { get; set; }
public object Data { get; set; }
public override string ToString()
return $"CloudEvent(Id={Id}, Source={Source}, Type={Type}, Time={Time}, Data={Data})";
public interface ICloudEventProducer
Task SendEventAsync(string destination, CloudEvent cloudEvent);
public interface ICloudEventConsumer
Task StartConsumingAsync(string source);
Task ProcessMessageAsync(CloudEvent cloudEvent);
public class InMemoryCloudEventProducer : ICloudEventProducer
private readonly MessageQueue<CloudEvent> _queue;
public InMemoryCloudEventProducer(MessageQueue<CloudEvent> queue)
public Task SendEventAsync(string destination, CloudEvent cloudEvent)
Console.WriteLine($"Sending CloudEvent to {destination}: {cloudEvent}");
_queue.Enqueue(cloudEvent);
return Task.CompletedTask;
public class InMemoryCloudEventConsumer : ICloudEventConsumer
private readonly MessageQueue<CloudEvent> _queue;
public InMemoryCloudEventConsumer(MessageQueue<CloudEvent> queue)
public async Task StartConsumingAsync(string source)
while (_queue.TryDequeue(out var cloudEvent))
await ProcessMessageAsync(cloudEvent);
public Task ProcessMessageAsync(CloudEvent cloudEvent)
Console.WriteLine($"Processing CloudEvent: {cloudEvent}");
return Task.CompletedTask;
public static async Task Main(string[] args)
var messageQueue = new MessageQueue<CloudEvent>();
ICloudEventProducer producer = new InMemoryCloudEventProducer(messageQueue);
ICloudEventConsumer consumer = new InMemoryCloudEventConsumer(messageQueue);
var cloudEvent1 = new CloudEvent
Type = "com.example.object.created",
var cloudEvent2 = new CloudEvent
Type = "com.example.object.updated",
await producer.SendEventAsync("Queue1", cloudEvent1);
await producer.SendEventAsync("Queue1", cloudEvent2);
await consumer.StartConsumingAsync("Queue1");