using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Caching.Distributed;
using System.Threading.Tasks;
namespace RealTimeAnalytics
public int Id { get; set; }
public string UserId { get; set; }
public double Amount { get; set; }
public DateTime Timestamp { get; set; }
public class AnalyticsHub : Hub
public async Task SendAnalyticsUpdate(string message)
await Clients.All.SendAsync("ReceiveMessage", message);
public class TransactionProcessingService : BackgroundService
private readonly IQueueClient _queueClient;
private readonly ILogger<TransactionProcessingService> _logger;
private readonly IHubContext<AnalyticsHub> _hubContext;
private readonly IDistributedCache _distributedCache;
public TransactionProcessingService(IQueueClient queueClient, ILogger<TransactionProcessingService> logger, IHubContext<AnalyticsHub> hubContext, IDistributedCache distributedCache)
_queueClient = queueClient;
_hubContext = hubContext;
_distributedCache = distributedCache;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
_queueClient.RegisterMessageHandler(
async (message, token) =>
var transactionJson = Encoding.UTF8.GetString(message.Body);
var transaction = JsonConvert.DeserializeObject<Transaction>(transactionJson);
if (!IsValidTransaction(transaction))
_logger.LogError($"Invalid transaction: {transactionJson}");
await _queueClient.DeadLetterAsync(message.SystemProperties.LockToken);
await ProcessTransaction(transaction);
_logger.LogError(ex, $"Error processing transaction: {transactionJson}");
await _queueClient.AbandonAsync(message.SystemProperties.LockToken);
await _hubContext.Clients.All.SendAsync("ReceiveMessage", $"Processed transaction for User {transaction.UserId}, Amount: {transaction.Amount}");
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
new MessageHandlerOptions(exceptionArgs =>
_logger.LogError(exceptionArgs.Exception, "Error processing message.");
return Task.CompletedTask;
{ MaxConcurrentCalls = 10, AutoComplete = false });
private bool IsValidTransaction(Transaction transaction)
return transaction != null
&& !string.IsNullOrEmpty(transaction.UserId)
&& transaction.Amount > 0
&& transaction.Timestamp != DateTime.MinValue;
private async Task ProcessTransaction(Transaction transaction)
var retryPolicy = Policy.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
var transactionJson = JsonConvert.SerializeObject(transaction);
await _distributedCache.SetStringAsync($"transaction:{transaction.Id}", transactionJson);
_logger.LogInformation($"Processed transaction for User {transaction.UserId}, Amount: {transaction.Amount}");
public Startup(IConfiguration configuration)
Configuration = configuration;
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
services.AddSingleton<IQueueClient>(x =>
new QueueClient(Configuration["AzureServiceBusConnectionString"], "transactions"));
services.AddHostedService<TransactionProcessingService>();
services.AddStackExchangeRedisCache(options =>
options.Configuration = Configuration["RedisConnectionString"];
options.InstanceName = "RealTimeAnalytics:";
services.AddControllers();
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
app.UseDeveloperExceptionPage();
app.UseEndpoints(endpoints =>
endpoints.MapHub<AnalyticsHub>("/analyticsHub");
endpoints.MapControllers();
public class TransactionController : Microsoft.AspNetCore.Mvc.ControllerBase
private readonly IQueueClient _queueClient;
public TransactionController(IQueueClient queueClient)
_queueClient = queueClient;
[Microsoft.AspNetCore.Mvc.HttpPost("api/transaction")]
public async Task<IActionResult> PostTransaction([Microsoft.AspNetCore.Mvc.FromBody] Transaction transaction)
if (!IsValidTransaction(transaction))
return BadRequest("Invalid transaction");
var messageBody = JsonConvert.SerializeObject(transaction);
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
var retryPolicy = Policy.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
await _queueClient.SendAsync(message);
return Ok("Transaction processed.");
private bool IsValidTransaction(Transaction transaction)
return transaction != null
&& !string.IsNullOrEmpty(transaction.UserId)
&& transaction.Amount > 0
&& transaction.Timestamp != DateTime.MinValue;
public static void Main(string[] args)
CreateHostBuilder(args).Build().Run();
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration((context, config) =>
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
.ConfigureWebHostDefaults(webBuilder =>
webBuilder.UseStartup<Startup>();