From 74549806d61b2c7c990823c0849b47e609b69b18 Mon Sep 17 00:00:00 2001 From: alex289 Date: Sat, 15 Mar 2025 22:56:21 +0100 Subject: [PATCH 1/3] feat: MassTransit --- .../CleanArchitecture.Api.csproj | 14 +- .../Extensions/ConfigurationExtensions.cs | 5 +- CleanArchitecture.Api/Program.cs | 49 +++- .../CleanArchitecture.AppHost.csproj | 2 +- .../CleanArchitecture.Application.csproj | 2 +- .../CleanArchitecture.Domain.csproj | 6 +- .../Constants/Messaging.cs | 6 - .../Consumers/FanoutEventConsumer.cs | 22 ++ .../Consumers/TenantUpdatedEventConsumer.cs | 22 ++ .../EventHandler/Fanout/FanoutEventHandler.cs | 28 +- .../Fanout/IFanoutEventHandler.cs | 2 +- .../Rabbitmq/Actions/BindQueueToExchange.cs | 23 -- .../Rabbitmq/Actions/CreateExchange.cs | 21 -- .../Rabbitmq/Actions/CreateQueue.cs | 24 -- .../Rabbitmq/Actions/IRabbitMqAction.cs | 9 - .../Rabbitmq/Actions/RegisterConsumer.cs | 33 --- .../Rabbitmq/Actions/SendAcknowledgement.cs | 19 -- .../Rabbitmq/Actions/SendMessage.cs | 38 --- .../Rabbitmq/Delegates.cs | 6 - .../Extensions/ServiceCollectionExtensions.cs | 18 -- .../Rabbitmq/RabbitMqHandler.cs | 240 ------------------ .../RabbitMqConfiguration.cs | 3 +- .../CleanArchitecture.Infrastructure.csproj | 8 +- .../CleanArchitecture.IntegrationTests.csproj | 4 +- .../GlobalSetupFixture.cs | 2 +- .../CleanArchitecture.Proto.csproj | 6 +- .../CleanArchitecture.ServiceDefaults.csproj | 12 +- .../CleanArchitecture.Shared.csproj | 1 + .../Events/DomainEvent.cs | 2 + .../Events/FanoutDomainEvent.cs | 18 ++ 30 files changed, 160 insertions(+), 485 deletions(-) delete mode 100644 CleanArchitecture.Domain/Constants/Messaging.cs create mode 100644 CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs create mode 100644 CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Delegates.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs delete mode 100644 CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs rename CleanArchitecture.Domain/{Rabbitmq => Settings}/RabbitMqConfiguration.cs (80%) create mode 100644 CleanArchitecture.Shared/Events/FanoutDomainEvent.cs diff --git a/CleanArchitecture.Api/CleanArchitecture.Api.csproj b/CleanArchitecture.Api/CleanArchitecture.Api.csproj index d0c2167..8fe222d 100644 --- a/CleanArchitecture.Api/CleanArchitecture.Api.csproj +++ b/CleanArchitecture.Api/CleanArchitecture.Api.csproj @@ -14,16 +14,18 @@ - - - - + + + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + diff --git a/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs b/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs index a539f68..99551e6 100644 --- a/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs +++ b/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs @@ -1,5 +1,5 @@ using System; -using CleanArchitecture.Domain.Rabbitmq; +using CleanArchitecture.Domain.Settings; using Microsoft.Extensions.Configuration; namespace CleanArchitecture.Api.Extensions; @@ -11,7 +11,6 @@ public static class ConfigurationExtensions { var isAspire = configuration["ASPIRE_ENABLED"] == "true"; - var rabbitEnabled = configuration["RabbitMQ:Enabled"]; var rabbitHost = configuration["RabbitMQ:Host"]; var rabbitPort = configuration["RabbitMQ:Port"]; var rabbitUser = configuration["RabbitMQ:Username"]; @@ -19,7 +18,6 @@ public static class ConfigurationExtensions if (isAspire) { - rabbitEnabled = "true"; var connectionString = configuration["ConnectionStrings:RabbitMq"]; var rabbitUri = new Uri(connectionString!); @@ -33,7 +31,6 @@ public static class ConfigurationExtensions { Host = rabbitHost ?? "", Port = int.Parse(rabbitPort ?? "0"), - Enabled = bool.Parse(rabbitEnabled ?? "false"), Username = rabbitUser ?? "", Password = rabbitPass ?? "" }; diff --git a/CleanArchitecture.Api/Program.cs b/CleanArchitecture.Api/Program.cs index 4c7772a..7b96320 100644 --- a/CleanArchitecture.Api/Program.cs +++ b/CleanArchitecture.Api/Program.cs @@ -4,19 +4,21 @@ using CleanArchitecture.Api.BackgroundServices; using CleanArchitecture.Api.Extensions; using CleanArchitecture.Application.Extensions; using CleanArchitecture.Application.gRPC; +using CleanArchitecture.Domain.Consumers; using CleanArchitecture.Domain.Extensions; -using CleanArchitecture.Domain.Rabbitmq.Extensions; using CleanArchitecture.Infrastructure.Database; using CleanArchitecture.Infrastructure.Extensions; using CleanArchitecture.ServiceDefaults; using HealthChecks.ApplicationStatus.DependencyInjection; using HealthChecks.UI.Client; +using MassTransit; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using RabbitMQ.Client; var builder = WebApplication.CreateBuilder(args); @@ -82,7 +84,48 @@ builder.Services.AddCommandHandlers(); builder.Services.AddNotificationHandlers(); builder.Services.AddApiUser(); -builder.Services.AddRabbitMqHandler(rabbitConfiguration); +builder.Services.AddMassTransit(x => +{ + x.AddConsumer(); + x.AddConsumer(); + + x.UsingRabbitMq((context, cfg) => + { + cfg.ConfigureNewtonsoftJsonSerializer(settings => + { + settings.TypeNameHandling = TypeNameHandling.Objects; + settings.NullValueHandling = NullValueHandling.Ignore; + return settings; + }); + cfg.UseNewtonsoftJsonSerializer(); + cfg.ConfigureNewtonsoftJsonDeserializer(settings => + { + settings.TypeNameHandling = TypeNameHandling.Objects; + settings.NullValueHandling = NullValueHandling.Ignore; + return settings; + }); + + cfg.Host(rabbitConfiguration.Host, (ushort)rabbitConfiguration.Port, "/", h => { + h.Username(rabbitConfiguration.Username); + h.Password(rabbitConfiguration.Password); + }); + + // Every instance of the service will receive the message + cfg.ReceiveEndpoint("clean-architecture-fanout-event-" + Guid.NewGuid(), e => + { + e.Durable = false; + e.AutoDelete = true; + e.ConfigureConsumer(context); + e.DiscardSkippedMessages(); + }); + cfg.ReceiveEndpoint("clean-architecture-fanout-events", e => + { + e.ConfigureConsumer(context); + e.DiscardSkippedMessages(); + }); + cfg.ConfigureEndpoints(context); + }); +}); builder.Services.AddHostedService(); @@ -148,7 +191,7 @@ app.MapControllers(); app.MapGrpcService(); app.MapGrpcService(); -app.Run(); +await app.RunAsync(); // Needed for integration tests web application factory public partial class Program diff --git a/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj b/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj index af6c235..ea4a382 100644 --- a/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj +++ b/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj @@ -15,7 +15,7 @@ - + diff --git a/CleanArchitecture.Application/CleanArchitecture.Application.csproj b/CleanArchitecture.Application/CleanArchitecture.Application.csproj index 314f212..913391e 100644 --- a/CleanArchitecture.Application/CleanArchitecture.Application.csproj +++ b/CleanArchitecture.Application/CleanArchitecture.Application.csproj @@ -6,7 +6,7 @@ - + diff --git a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj index 0b85c1b..cae4f95 100644 --- a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj +++ b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj @@ -8,11 +8,11 @@ + - + - - + diff --git a/CleanArchitecture.Domain/Constants/Messaging.cs b/CleanArchitecture.Domain/Constants/Messaging.cs deleted file mode 100644 index 8d2f401..0000000 --- a/CleanArchitecture.Domain/Constants/Messaging.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace CleanArchitecture.Domain.Constants; - -public sealed class Messaging -{ - public const string ExchangeNameNotifications = "exchange-notifications"; -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs b/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs new file mode 100644 index 0000000..b5d4487 --- /dev/null +++ b/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs @@ -0,0 +1,22 @@ +using System.Threading.Tasks; +using CleanArchitecture.Shared.Events; +using MassTransit; +using Microsoft.Extensions.Logging; + +namespace CleanArchitecture.Domain.Consumers; + +public sealed class FanoutEventConsumer : IConsumer +{ + private readonly ILogger _logger; + + public FanoutEventConsumer(ILogger logger) + { + _logger = logger; + } + + public Task Consume(ConsumeContext context) + { + _logger.LogInformation("FanoutDomainEventConsumer: {FanoutDomainEvent}", context.Message); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs b/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs new file mode 100644 index 0000000..5eec961 --- /dev/null +++ b/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs @@ -0,0 +1,22 @@ +using System.Threading.Tasks; +using CleanArchitecture.Shared.Events.Tenant; +using MassTransit; +using Microsoft.Extensions.Logging; + +namespace CleanArchitecture.Domain.Consumers; + +public sealed class TenantUpdatedEventConsumer : IConsumer +{ + private readonly ILogger _logger; + + public TenantUpdatedEventConsumer(ILogger logger) + { + _logger = logger; + } + + public Task Consume(ConsumeContext context) + { + _logger.LogInformation("TenantUpdatedEventConsumer: {TenantId}", context.Message.AggregateId); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs b/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs index 8d3ae4d..0744209 100644 --- a/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs +++ b/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs @@ -1,27 +1,33 @@ using System.Threading.Tasks; -using CleanArchitecture.Domain.Constants; -using CleanArchitecture.Domain.Rabbitmq; +using CleanArchitecture.Domain.Interfaces; using CleanArchitecture.Shared.Events; +using MassTransit; namespace CleanArchitecture.Domain.EventHandler.Fanout; public sealed class FanoutEventHandler : IFanoutEventHandler { - private readonly RabbitMqHandler _rabbitMqHandler; + private readonly IPublishEndpoint _massTransit; + private readonly IUser _user; public FanoutEventHandler( - RabbitMqHandler rabbitMqHandler) + IPublishEndpoint massTransit, IUser user) { - _rabbitMqHandler = rabbitMqHandler; - _rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications); + _massTransit = massTransit; + _user = user; } - public Task HandleDomainEventAsync(DomainEvent @event) + public async Task HandleDomainEventAsync(T @event) where T : DomainEvent { - _rabbitMqHandler.EnqueueExchangeMessage( - Messaging.ExchangeNameNotifications, - @event); + var fanoutDomainEvent = + new FanoutDomainEvent( + @event.AggregateId, + @event, + _user.GetUserId()); + + await _massTransit.Publish(fanoutDomainEvent); + await _massTransit.Publish(@event); - return Task.FromResult(@event); + return @event; } } \ No newline at end of file diff --git a/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs b/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs index 26453ac..2ff8002 100644 --- a/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs +++ b/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs @@ -5,5 +5,5 @@ namespace CleanArchitecture.Domain.EventHandler.Fanout; public interface IFanoutEventHandler { - Task HandleDomainEventAsync(DomainEvent @event); + Task HandleDomainEventAsync(T @event) where T : DomainEvent; } \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs deleted file mode 100644 index fc05e05..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class BindQueueToExchange : IRabbitMqAction -{ - private readonly string _exchangeName; - private readonly string _queueName; - private readonly string _routingKey; - - public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "") - { - _exchangeName = exchangeName; - _routingKey = routingKey; - _queueName = queueName; - } - - public async Task Perform(IChannel channel) - { - await channel.QueueBindAsync(_queueName, _exchangeName, _routingKey); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs deleted file mode 100644 index 6b43659..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class CreateExchange : IRabbitMqAction -{ - private readonly string _name; - private readonly string _type; - - public CreateExchange(string name, string type) - { - _name = name; - _type = type; - } - - public async Task Perform(IChannel channel) - { - await channel.ExchangeDeclareAsync(_name, _type); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs deleted file mode 100644 index 0f69898..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class CreateQueue : IRabbitMqAction -{ - public string QueueName { get; } - - public CreateQueue(string queueName) - { - QueueName = queueName; - } - - public async Task Perform(IChannel channel) - { - await channel.QueueDeclareAsync( - QueueName, - false, - false, - false, - null); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs deleted file mode 100644 index feacd7b..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public interface IRabbitMqAction -{ - Task Perform(IChannel channel); -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs deleted file mode 100644 index 4fcd40d..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class RegisterConsumer : IRabbitMqAction -{ - private readonly Func _addConsumer; - private readonly ConsumeEventHandler _consumer; - private readonly string _exchange; - private readonly string _queue; - private readonly string _routingKey; - - public RegisterConsumer( - string exchange, - string queue, - string routingKey, - ConsumeEventHandler consumer, - Func addConsumer) - { - _exchange = exchange; - _queue = queue; - _routingKey = routingKey; - _consumer = consumer; - _addConsumer = addConsumer; - } - - public async Task Perform(IChannel channel) - { - await _addConsumer(_exchange, _queue, _routingKey, _consumer); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs deleted file mode 100644 index 8233d6d..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System.Threading.Tasks; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class SendAcknowledgement : IRabbitMqAction -{ - public ulong DeliveryTag { get; } - - public SendAcknowledgement(ulong deliveryTag) - { - DeliveryTag = deliveryTag; - } - - public async Task Perform(IChannel channel) - { - await channel.BasicAckAsync(DeliveryTag, false); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs deleted file mode 100644 index e407fc6..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System.Text; -using System.Threading.Tasks; -using Newtonsoft.Json; -using RabbitMQ.Client; - -namespace CleanArchitecture.Domain.Rabbitmq.Actions; - -public sealed class SendMessage : IRabbitMqAction -{ - private static readonly JsonSerializerSettings s_serializerSettings = - new() { TypeNameHandling = TypeNameHandling.Objects }; - - private readonly string _exchange; - private readonly object _message; - - private readonly string _routingKey; - - - /// If exchange is empty, this is the name of the queue - public SendMessage(string routingKey, string exchange, object message) - { - _routingKey = routingKey; - _exchange = exchange; - _message = message; - } - - public async Task Perform(IChannel channel) - { - var json = JsonConvert.SerializeObject(_message, s_serializerSettings); - - var content = Encoding.UTF8.GetBytes(json); - - await channel.BasicPublishAsync( - _exchange, - _routingKey, - content); - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Delegates.cs b/CleanArchitecture.Domain/Rabbitmq/Delegates.cs deleted file mode 100644 index 70e87c7..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Delegates.cs +++ /dev/null @@ -1,6 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace CleanArchitecture.Domain.Rabbitmq; - -public delegate Task ConsumeEventHandler(ReadOnlyMemory content); \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs b/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs deleted file mode 100644 index a785525..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs +++ /dev/null @@ -1,18 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; - -namespace CleanArchitecture.Domain.Rabbitmq.Extensions; - -public static class ServiceCollectionExtensions -{ - public static IServiceCollection AddRabbitMqHandler( - this IServiceCollection services, - RabbitMqConfiguration configuration) - { - services.AddSingleton(configuration); - - services.AddSingleton(); - services.AddHostedService(serviceProvider => serviceProvider.GetService()!); - - return services; - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs deleted file mode 100644 index 1d70e99..0000000 --- a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs +++ /dev/null @@ -1,240 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using CleanArchitecture.Domain.Rabbitmq.Actions; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; - -namespace CleanArchitecture.Domain.Rabbitmq; - -public sealed class RabbitMqHandler : BackgroundService -{ - private readonly RabbitMqConfiguration _configuration; - - private readonly ConcurrentDictionary> _consumers = new(); - - private readonly ILogger _logger; - - private readonly ConcurrentQueue _pendingActions = new(); - private IChannel? _channel; - - public RabbitMqHandler( - RabbitMqConfiguration configuration, - ILogger logger) - { - _configuration = configuration; - _logger = logger; - } - - public override async Task StartAsync(CancellationToken cancellationToken) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Connection will not be established"); - return; - } - - _logger.LogInformation("Starting RabbitMQ connection"); - - var factory = new ConnectionFactory - { - AutomaticRecoveryEnabled = true, - HostName = _configuration.Host, - Port = _configuration.Port, - UserName = _configuration.Username, - Password = _configuration.Password - }; - - var connection = await factory.CreateConnectionAsync(cancellationToken); - _channel = await connection.CreateChannelAsync(null, cancellationToken); - - await base.StartAsync(cancellationToken); - } - - - public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.", - exchangeName); - return; - } - - _pendingActions.Enqueue(new CreateExchange(exchangeName, type)); - } - - public void InitializeQueues(params string[] queueNames) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues."); - return; - } - - foreach (var queue in queueNames) - { - _pendingActions.Enqueue(new CreateQueue(queue)); - } - } - - public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "") - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange."); - return; - } - - _pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey)); - } - - public void AddConsumer(string queueName, ConsumeEventHandler consumer) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer."); - return; - } - - // routingKey is set to queueName to mimic rabbitMQ - _pendingActions.Enqueue( - new RegisterConsumer( - string.Empty, - queueName, - queueName, - consumer, - AddEventConsumer)); - } - - public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer."); - return; - } - - _pendingActions.Enqueue( - new RegisterConsumer( - exchange, - queue, - routingKey, - consumer, - AddEventConsumer)); - } - - public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer) - { - AddExchangeConsumer(exchange, string.Empty, queue, consumer); - } - - private async Task AddEventConsumer(string exchange, string queueName, string routingKey, - ConsumeEventHandler consumer) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Event consumer will not be added."); - return; - } - - var key = $"{exchange}-{routingKey}"; - - if (!_consumers.TryGetValue(key, out var consumers)) - { - consumers = new List(); - _consumers.TryAdd(key, consumers); - - var eventHandler = new AsyncEventingBasicConsumer(_channel!); - eventHandler.ReceivedAsync += CallEventConsumersAsync; - - await _channel!.BasicConsumeAsync(queueName, false, eventHandler); - } - - consumers.Add(consumer); - } - - private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea) - { - var key = $"{ea.Exchange}-{ea.RoutingKey}"; - - if (!_consumers.TryGetValue(key, out var consumers)) - { - return; - } - - foreach (var consumer in consumers) - { - try - { - await consumer(ea.Body); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error while handling event in queue {RoutingKey}", ea.RoutingKey); - } - } - - _pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag)); - } - - - public void EnqueueMessage(string queueName, object message) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message"); - return; - } - - _pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message)); - } - - public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "") - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message"); - return; - } - - _pendingActions.Enqueue(new SendMessage(routingKey, exchange, message)); - } - - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - if (!_configuration.Enabled) - { - _logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started"); - return; - } - - while (true) - { - await HandleEnqueuedActions(); - - await Task.Delay(1000, stoppingToken); - } - } - - private async Task HandleEnqueuedActions() - { - while (_pendingActions.TryDequeue(out var action)) - { - try - { - await action.Perform(_channel!); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error while trying to send a rabbitmq message"); - _pendingActions.Enqueue(action); - } - } - } -} \ No newline at end of file diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs b/CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs similarity index 80% rename from CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs rename to CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs index b5771e4..1db8a17 100644 --- a/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs +++ b/CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs @@ -1,10 +1,9 @@ -namespace CleanArchitecture.Domain.Rabbitmq; +namespace CleanArchitecture.Domain.Settings; public sealed class RabbitMqConfiguration { public string Host { get; set; } = string.Empty; public int Port { get; set; } - public bool Enabled { get; set; } public string Username { get; set; } = string.Empty; public string Password { get; set; } = string.Empty; diff --git a/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj b/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj index e7d2b11..b71d224 100644 --- a/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj +++ b/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj @@ -12,10 +12,10 @@ - - - - + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj b/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj index efe574e..e02b8b1 100644 --- a/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj +++ b/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj @@ -13,8 +13,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + diff --git a/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs b/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs index 8bce947..3c5bc34 100644 --- a/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs +++ b/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs @@ -66,7 +66,7 @@ internal class GlobalSetupFixture catch (Exception ex) { // Creation of the respawner can fail if the database has not been created yet - TestContext.WriteLine($"Failed to create respawner: {ex.Message}"); + await TestContext.Out.WriteLineAsync($"Failed to create respawner: {ex.Message}"); } } diff --git a/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj b/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj index 4ee4ccd..72b9e8b 100644 --- a/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj +++ b/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj @@ -13,9 +13,9 @@ - - - + + + diff --git a/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj b/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj index ff91ee2..7208282 100644 --- a/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj +++ b/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj @@ -10,15 +10,15 @@ - + - - - + + + - - + + diff --git a/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj b/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj index fcab850..077e614 100644 --- a/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj +++ b/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj @@ -7,6 +7,7 @@ + diff --git a/CleanArchitecture.Shared/Events/DomainEvent.cs b/CleanArchitecture.Shared/Events/DomainEvent.cs index f541549..3b8938a 100644 --- a/CleanArchitecture.Shared/Events/DomainEvent.cs +++ b/CleanArchitecture.Shared/Events/DomainEvent.cs @@ -1,8 +1,10 @@ using System; +using MassTransit; using MediatR; namespace CleanArchitecture.Shared.Events; +[ExcludeFromTopology] public abstract class DomainEvent : Message, INotification { public DateTime Timestamp { get; private set; } diff --git a/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs b/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs new file mode 100644 index 0000000..baf73d5 --- /dev/null +++ b/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs @@ -0,0 +1,18 @@ +using System; + +namespace CleanArchitecture.Shared.Events; + +public class FanoutDomainEvent : DomainEvent +{ + public DomainEvent DomainEvent { get; } + public Guid? UserId { get; } + + public FanoutDomainEvent( + Guid aggregateId, + DomainEvent domainEvent, + Guid? userId) : base(aggregateId) + { + DomainEvent = domainEvent; + UserId = userId; + } +} From f17112657c3bcaa427eca83c715410577e1a2c85 Mon Sep 17 00:00:00 2001 From: alex289 Date: Sun, 16 Mar 2025 00:23:50 +0100 Subject: [PATCH 2/3] fix: Allow services to get healthy --- .../UtilityTests/HealthChecksTests.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CleanArchitecture.IntegrationTests/UtilityTests/HealthChecksTests.cs b/CleanArchitecture.IntegrationTests/UtilityTests/HealthChecksTests.cs index ce332bd..0409b74 100644 --- a/CleanArchitecture.IntegrationTests/UtilityTests/HealthChecksTests.cs +++ b/CleanArchitecture.IntegrationTests/UtilityTests/HealthChecksTests.cs @@ -17,6 +17,9 @@ public sealed class HealthChecksTests [Test, Order(0)] public async Task Should_Return_Healthy() { + // Wait some time to let the services get healthy + await Task.Delay(2000); + var response = await _fixture.ServerClient.GetAsync("/healthz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); From 18ba6d52a82df929035ca3a1c1ea9172b8041f52 Mon Sep 17 00:00:00 2001 From: alex289 Date: Sun, 16 Mar 2025 00:24:28 +0100 Subject: [PATCH 3/3] feat: Check services health while testing and developing too --- CleanArchitecture.Api/Program.cs | 36 +++++++++++++------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/CleanArchitecture.Api/Program.cs b/CleanArchitecture.Api/Program.cs index 7b96320..d8091ed 100644 --- a/CleanArchitecture.Api/Program.cs +++ b/CleanArchitecture.Api/Program.cs @@ -35,11 +35,6 @@ if (builder.Environment.IsProduction()) builder.Services.AddZenFirewall(); } -builder.Services - .AddHealthChecks() - .AddDbContextCheck() - .AddApplicationStatus(); - var isAspire = builder.Configuration["ASPIRE_ENABLED"] == "true"; var rabbitConfiguration = builder.Configuration.GetRabbitMqConfiguration(); @@ -49,23 +44,22 @@ var dbConnectionString = isAspire ? builder.Configuration["ConnectionStrings:Database"] : builder.Configuration["ConnectionStrings:DefaultConnection"]; -if (builder.Environment.IsProduction()) -{ - builder.Services - .AddHealthChecks() - .AddSqlServer(dbConnectionString!) - .AddRedis(redisConnectionString!, "Redis") - .AddRabbitMQ( - async _ => +builder.Services + .AddHealthChecks() + .AddDbContextCheck() + .AddApplicationStatus() + .AddSqlServer(dbConnectionString!) + .AddRedis(redisConnectionString!, "Redis") + .AddRabbitMQ( + async _ => + { + var factory = new ConnectionFactory { - var factory = new ConnectionFactory - { - Uri = new Uri(rabbitConfiguration.ConnectionString), - }; - return await factory.CreateConnectionAsync(); - }, - name: "RabbitMQ"); -} + Uri = new Uri(rabbitConfiguration.ConnectionString), + }; + return await factory.CreateConnectionAsync(); + }, + name: "RabbitMQ"); builder.Services.AddDbContext(options => {