using System.Collections.Generic;
using System.Threading.Tasks;
static async Task Main(string[] args)
var _subscriptionsManager = new RequestReplyBusSubscriptionsManager();
_subscriptionsManager.AddSubscription<ReplyMessageExample, GetElevatorRequest, GetExampeRequestHandler>("get_elevators");
IEnumerable<SubscriptionInfo> subscriptions = _subscriptionsManager.GetSubscriptionsForEvent("get_elevators");
foreach (var subscription in subscriptions)
var handler = new GetExampeRequestHandler();
Type concreteType = typeof(IRequestMessageHandler<,>).MakeGenericType(subscription.RequestType, subscription.ReplyType);
object integrationEvent = JsonConvert.DeserializeObject("{\"equipmentNumber\":\"AT9512CA\"}", subscription.RequestType);
Type concreteType2 = typeof(Task<>).MakeGenericType(subscription.ReplyType);
var reply = await (Task<dynamic>)concreteType.GetMethod("Handle")?.Invoke(handler, new[] { integrationEvent });
Console.WriteLine("Done");
public class GetElevatorRequest
public string EquipmentNumber { get; set; }
public interface IReplyMessage
public class ReplyMessageExample : IReplyMessage
public string EquipmentNumber { get; set; }
public bool Connected { get; set; }
public class GetExampeRequestHandler : IRequestMessageHandler<GetElevatorRequest, ReplyMessageExample>
public Task<ReplyMessageExample> Handle(GetElevatorRequest request)
Console.WriteLine("Processing request for elevator");
var reply = new ReplyMessageExample
EquipmentNumber = request.EquipmentNumber,
Connected = new Random().Next() % 2 == 0
return Task.FromResult(reply);
public interface IRequestMessageHandler<in TRequest, TReply>
Task<TReply> Handle(TRequest request);
public sealed class RequestReplyBusSubscriptionsManager
private readonly Dictionary<string, List<SubscriptionInfo>> _subscriptions;
public RequestReplyBusSubscriptionsManager()
_subscriptions = new Dictionary<string, List<SubscriptionInfo>>();
public bool HasSubscriptionsForRequest(string eventType)
return _subscriptions.ContainsKey(eventType);
public void AddSubscription<TReply, TRequest, TRequestHandler>(string requestName)
where TRequestHandler : IRequestMessageHandler<TRequest, TReply>
Type handlerType = typeof(TRequestHandler);
if (!HasSubscriptionsForRequest(requestName))
_subscriptions.Add(requestName, new List<SubscriptionInfo>());
else if (_subscriptions[requestName].Any(t => t.HandlerType == handlerType))
throw new ArgumentException($"Handler Type { handlerType.Name } already registered for '{requestName}' with reply type '{typeof(TReply)}'");
_subscriptions[requestName].Add(new SubscriptionInfo(typeof(TReply), typeof(TRequest), handlerType));
public IEnumerable<SubscriptionInfo> GetSubscriptionsForEvent(string eventType)
return _subscriptions[eventType];
public sealed class SubscriptionInfo
public SubscriptionInfo(Type replyType, Type requestType, Type handlerType)
RequestType = requestType;
HandlerType = handlerType;
public Type ReplyType { get; }
public Type HandlerType { get; }
public Type RequestType { get; }