using System.Threading.Tasks;
public class CashCorrectionEvent
public string TradeSide { get; set; }
public decimal Amount { get; set; }
public interface IEventsConsumer<T> : IDisposable
Task<T> ConsumeAsync(CancellationToken token);
internal sealed class CashCorrectionEventsConsumer : IEventsConsumer<CashCorrectionEvent>
public Task<CashCorrectionEvent> ConsumeAsync(CancellationToken token) => Task.FromResult<CashCorrectionEvent>(new CashCorrectionEvent());
public interface IEventRepository<T> : IDisposable
Task AddAsync(T cashCorrectionEvent);
public class CashCorrectionEventRepository : IEventRepository<CashCorrectionEvent>
public Task AddAsync(CashCorrectionEvent cashCorrectionEvent) => Task.CompletedTask;
internal sealed class EventsListener
private CashCorrectionEventRepository repository;
private IEventsConsumer<CashCorrectionEvent> _eventsConsumer;
repository = new CashCorrectionEventRepository();
_eventsConsumer = new CashCorrectionEventsConsumer();
Task.Run(() => ListenerRoutine());
private void ListenerRoutine()
var cashCorrectionEvent = _eventsConsumer.ConsumeAsync(CancellationToken.None).Result;
ValidateEvent(cashCorrectionEvent);
repository.AddAsync(cashCorrectionEvent);
private void ValidateEvent(CashCorrectionEvent cashCorrectionEvent)
if (cashCorrectionEvent.TradeSide == "BUY" && cashCorrectionEvent.Amount < 0) throw new Exception();
if (cashCorrectionEvent.TradeSide == "SELL" && cashCorrectionEvent.Amount > 0) throw new Exception();
_eventsConsumer.Dispose();