diff --git a/CleanArchitecture.Api/CleanArchitecture.Api.csproj b/CleanArchitecture.Api/CleanArchitecture.Api.csproj
index d0c2167..8fe222d 100644
--- a/CleanArchitecture.Api/CleanArchitecture.Api.csproj
+++ b/CleanArchitecture.Api/CleanArchitecture.Api.csproj
@@ -14,16 +14,18 @@
-
-
-
-
+
+
+
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
-
+
diff --git a/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs b/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs
index a539f68..99551e6 100644
--- a/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs
+++ b/CleanArchitecture.Api/Extensions/ConfigurationExtensions.cs
@@ -1,5 +1,5 @@
using System;
-using CleanArchitecture.Domain.Rabbitmq;
+using CleanArchitecture.Domain.Settings;
using Microsoft.Extensions.Configuration;
namespace CleanArchitecture.Api.Extensions;
@@ -11,7 +11,6 @@ public static class ConfigurationExtensions
{
var isAspire = configuration["ASPIRE_ENABLED"] == "true";
- var rabbitEnabled = configuration["RabbitMQ:Enabled"];
var rabbitHost = configuration["RabbitMQ:Host"];
var rabbitPort = configuration["RabbitMQ:Port"];
var rabbitUser = configuration["RabbitMQ:Username"];
@@ -19,7 +18,6 @@ public static class ConfigurationExtensions
if (isAspire)
{
- rabbitEnabled = "true";
var connectionString = configuration["ConnectionStrings:RabbitMq"];
var rabbitUri = new Uri(connectionString!);
@@ -33,7 +31,6 @@ public static class ConfigurationExtensions
{
Host = rabbitHost ?? "",
Port = int.Parse(rabbitPort ?? "0"),
- Enabled = bool.Parse(rabbitEnabled ?? "false"),
Username = rabbitUser ?? "",
Password = rabbitPass ?? ""
};
diff --git a/CleanArchitecture.Api/Program.cs b/CleanArchitecture.Api/Program.cs
index 4c7772a..7b96320 100644
--- a/CleanArchitecture.Api/Program.cs
+++ b/CleanArchitecture.Api/Program.cs
@@ -4,19 +4,21 @@ using CleanArchitecture.Api.BackgroundServices;
using CleanArchitecture.Api.Extensions;
using CleanArchitecture.Application.Extensions;
using CleanArchitecture.Application.gRPC;
+using CleanArchitecture.Domain.Consumers;
using CleanArchitecture.Domain.Extensions;
-using CleanArchitecture.Domain.Rabbitmq.Extensions;
using CleanArchitecture.Infrastructure.Database;
using CleanArchitecture.Infrastructure.Extensions;
using CleanArchitecture.ServiceDefaults;
using HealthChecks.ApplicationStatus.DependencyInjection;
using HealthChecks.UI.Client;
+using MassTransit;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
@@ -82,7 +84,48 @@ builder.Services.AddCommandHandlers();
builder.Services.AddNotificationHandlers();
builder.Services.AddApiUser();
-builder.Services.AddRabbitMqHandler(rabbitConfiguration);
+builder.Services.AddMassTransit(x =>
+{
+ x.AddConsumer();
+ x.AddConsumer();
+
+ x.UsingRabbitMq((context, cfg) =>
+ {
+ cfg.ConfigureNewtonsoftJsonSerializer(settings =>
+ {
+ settings.TypeNameHandling = TypeNameHandling.Objects;
+ settings.NullValueHandling = NullValueHandling.Ignore;
+ return settings;
+ });
+ cfg.UseNewtonsoftJsonSerializer();
+ cfg.ConfigureNewtonsoftJsonDeserializer(settings =>
+ {
+ settings.TypeNameHandling = TypeNameHandling.Objects;
+ settings.NullValueHandling = NullValueHandling.Ignore;
+ return settings;
+ });
+
+ cfg.Host(rabbitConfiguration.Host, (ushort)rabbitConfiguration.Port, "/", h => {
+ h.Username(rabbitConfiguration.Username);
+ h.Password(rabbitConfiguration.Password);
+ });
+
+ // Every instance of the service will receive the message
+ cfg.ReceiveEndpoint("clean-architecture-fanout-event-" + Guid.NewGuid(), e =>
+ {
+ e.Durable = false;
+ e.AutoDelete = true;
+ e.ConfigureConsumer(context);
+ e.DiscardSkippedMessages();
+ });
+ cfg.ReceiveEndpoint("clean-architecture-fanout-events", e =>
+ {
+ e.ConfigureConsumer(context);
+ e.DiscardSkippedMessages();
+ });
+ cfg.ConfigureEndpoints(context);
+ });
+});
builder.Services.AddHostedService();
@@ -148,7 +191,7 @@ app.MapControllers();
app.MapGrpcService();
app.MapGrpcService();
-app.Run();
+await app.RunAsync();
// Needed for integration tests web application factory
public partial class Program
diff --git a/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj b/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj
index af6c235..ea4a382 100644
--- a/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj
+++ b/CleanArchitecture.AppHost/CleanArchitecture.AppHost.csproj
@@ -15,7 +15,7 @@
-
+
diff --git a/CleanArchitecture.Application/CleanArchitecture.Application.csproj b/CleanArchitecture.Application/CleanArchitecture.Application.csproj
index 314f212..913391e 100644
--- a/CleanArchitecture.Application/CleanArchitecture.Application.csproj
+++ b/CleanArchitecture.Application/CleanArchitecture.Application.csproj
@@ -6,7 +6,7 @@
-
+
diff --git a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj
index 0b85c1b..cae4f95 100644
--- a/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj
+++ b/CleanArchitecture.Domain/CleanArchitecture.Domain.csproj
@@ -8,11 +8,11 @@
+
-
+
-
-
+
diff --git a/CleanArchitecture.Domain/Constants/Messaging.cs b/CleanArchitecture.Domain/Constants/Messaging.cs
deleted file mode 100644
index 8d2f401..0000000
--- a/CleanArchitecture.Domain/Constants/Messaging.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-namespace CleanArchitecture.Domain.Constants;
-
-public sealed class Messaging
-{
- public const string ExchangeNameNotifications = "exchange-notifications";
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs b/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
new file mode 100644
index 0000000..b5d4487
--- /dev/null
+++ b/CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
@@ -0,0 +1,22 @@
+using System.Threading.Tasks;
+using CleanArchitecture.Shared.Events;
+using MassTransit;
+using Microsoft.Extensions.Logging;
+
+namespace CleanArchitecture.Domain.Consumers;
+
+public sealed class FanoutEventConsumer : IConsumer
+{
+ private readonly ILogger _logger;
+
+ public FanoutEventConsumer(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public Task Consume(ConsumeContext context)
+ {
+ _logger.LogInformation("FanoutDomainEventConsumer: {FanoutDomainEvent}", context.Message);
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs b/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs
new file mode 100644
index 0000000..5eec961
--- /dev/null
+++ b/CleanArchitecture.Domain/Consumers/TenantUpdatedEventConsumer.cs
@@ -0,0 +1,22 @@
+using System.Threading.Tasks;
+using CleanArchitecture.Shared.Events.Tenant;
+using MassTransit;
+using Microsoft.Extensions.Logging;
+
+namespace CleanArchitecture.Domain.Consumers;
+
+public sealed class TenantUpdatedEventConsumer : IConsumer
+{
+ private readonly ILogger _logger;
+
+ public TenantUpdatedEventConsumer(ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public Task Consume(ConsumeContext context)
+ {
+ _logger.LogInformation("TenantUpdatedEventConsumer: {TenantId}", context.Message.AggregateId);
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs b/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs
index 8d3ae4d..0744209 100644
--- a/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs
+++ b/CleanArchitecture.Domain/EventHandler/Fanout/FanoutEventHandler.cs
@@ -1,27 +1,33 @@
using System.Threading.Tasks;
-using CleanArchitecture.Domain.Constants;
-using CleanArchitecture.Domain.Rabbitmq;
+using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Shared.Events;
+using MassTransit;
namespace CleanArchitecture.Domain.EventHandler.Fanout;
public sealed class FanoutEventHandler : IFanoutEventHandler
{
- private readonly RabbitMqHandler _rabbitMqHandler;
+ private readonly IPublishEndpoint _massTransit;
+ private readonly IUser _user;
public FanoutEventHandler(
- RabbitMqHandler rabbitMqHandler)
+ IPublishEndpoint massTransit, IUser user)
{
- _rabbitMqHandler = rabbitMqHandler;
- _rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications);
+ _massTransit = massTransit;
+ _user = user;
}
- public Task HandleDomainEventAsync(DomainEvent @event)
+ public async Task HandleDomainEventAsync(T @event) where T : DomainEvent
{
- _rabbitMqHandler.EnqueueExchangeMessage(
- Messaging.ExchangeNameNotifications,
- @event);
+ var fanoutDomainEvent =
+ new FanoutDomainEvent(
+ @event.AggregateId,
+ @event,
+ _user.GetUserId());
+
+ await _massTransit.Publish(fanoutDomainEvent);
+ await _massTransit.Publish(@event);
- return Task.FromResult(@event);
+ return @event;
}
}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs b/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs
index 26453ac..2ff8002 100644
--- a/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs
+++ b/CleanArchitecture.Domain/EventHandler/Fanout/IFanoutEventHandler.cs
@@ -5,5 +5,5 @@ namespace CleanArchitecture.Domain.EventHandler.Fanout;
public interface IFanoutEventHandler
{
- Task HandleDomainEventAsync(DomainEvent @event);
+ Task HandleDomainEventAsync(T @event) where T : DomainEvent;
}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs
deleted file mode 100644
index fc05e05..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/BindQueueToExchange.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class BindQueueToExchange : IRabbitMqAction
-{
- private readonly string _exchangeName;
- private readonly string _queueName;
- private readonly string _routingKey;
-
- public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
- {
- _exchangeName = exchangeName;
- _routingKey = routingKey;
- _queueName = queueName;
- }
-
- public async Task Perform(IChannel channel)
- {
- await channel.QueueBindAsync(_queueName, _exchangeName, _routingKey);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs
deleted file mode 100644
index 6b43659..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateExchange.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class CreateExchange : IRabbitMqAction
-{
- private readonly string _name;
- private readonly string _type;
-
- public CreateExchange(string name, string type)
- {
- _name = name;
- _type = type;
- }
-
- public async Task Perform(IChannel channel)
- {
- await channel.ExchangeDeclareAsync(_name, _type);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs
deleted file mode 100644
index 0f69898..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/CreateQueue.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class CreateQueue : IRabbitMqAction
-{
- public string QueueName { get; }
-
- public CreateQueue(string queueName)
- {
- QueueName = queueName;
- }
-
- public async Task Perform(IChannel channel)
- {
- await channel.QueueDeclareAsync(
- QueueName,
- false,
- false,
- false,
- null);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs
deleted file mode 100644
index feacd7b..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/IRabbitMqAction.cs
+++ /dev/null
@@ -1,9 +0,0 @@
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public interface IRabbitMqAction
-{
- Task Perform(IChannel channel);
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs
deleted file mode 100644
index 4fcd40d..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/RegisterConsumer.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-using System;
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class RegisterConsumer : IRabbitMqAction
-{
- private readonly Func _addConsumer;
- private readonly ConsumeEventHandler _consumer;
- private readonly string _exchange;
- private readonly string _queue;
- private readonly string _routingKey;
-
- public RegisterConsumer(
- string exchange,
- string queue,
- string routingKey,
- ConsumeEventHandler consumer,
- Func addConsumer)
- {
- _exchange = exchange;
- _queue = queue;
- _routingKey = routingKey;
- _consumer = consumer;
- _addConsumer = addConsumer;
- }
-
- public async Task Perform(IChannel channel)
- {
- await _addConsumer(_exchange, _queue, _routingKey, _consumer);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs
deleted file mode 100644
index 8233d6d..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/SendAcknowledgement.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using System.Threading.Tasks;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class SendAcknowledgement : IRabbitMqAction
-{
- public ulong DeliveryTag { get; }
-
- public SendAcknowledgement(ulong deliveryTag)
- {
- DeliveryTag = deliveryTag;
- }
-
- public async Task Perform(IChannel channel)
- {
- await channel.BasicAckAsync(DeliveryTag, false);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs b/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs
deleted file mode 100644
index e407fc6..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Actions/SendMessage.cs
+++ /dev/null
@@ -1,38 +0,0 @@
-using System.Text;
-using System.Threading.Tasks;
-using Newtonsoft.Json;
-using RabbitMQ.Client;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Actions;
-
-public sealed class SendMessage : IRabbitMqAction
-{
- private static readonly JsonSerializerSettings s_serializerSettings =
- new() { TypeNameHandling = TypeNameHandling.Objects };
-
- private readonly string _exchange;
- private readonly object _message;
-
- private readonly string _routingKey;
-
-
- /// If exchange is empty, this is the name of the queue
- public SendMessage(string routingKey, string exchange, object message)
- {
- _routingKey = routingKey;
- _exchange = exchange;
- _message = message;
- }
-
- public async Task Perform(IChannel channel)
- {
- var json = JsonConvert.SerializeObject(_message, s_serializerSettings);
-
- var content = Encoding.UTF8.GetBytes(json);
-
- await channel.BasicPublishAsync(
- _exchange,
- _routingKey,
- content);
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Delegates.cs b/CleanArchitecture.Domain/Rabbitmq/Delegates.cs
deleted file mode 100644
index 70e87c7..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Delegates.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-using System;
-using System.Threading.Tasks;
-
-namespace CleanArchitecture.Domain.Rabbitmq;
-
-public delegate Task ConsumeEventHandler(ReadOnlyMemory content);
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs b/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs
deleted file mode 100644
index a785525..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/Extensions/ServiceCollectionExtensions.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using Microsoft.Extensions.DependencyInjection;
-
-namespace CleanArchitecture.Domain.Rabbitmq.Extensions;
-
-public static class ServiceCollectionExtensions
-{
- public static IServiceCollection AddRabbitMqHandler(
- this IServiceCollection services,
- RabbitMqConfiguration configuration)
- {
- services.AddSingleton(configuration);
-
- services.AddSingleton();
- services.AddHostedService(serviceProvider => serviceProvider.GetService()!);
-
- return services;
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs
deleted file mode 100644
index 1d70e99..0000000
--- a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs
+++ /dev/null
@@ -1,240 +0,0 @@
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using CleanArchitecture.Domain.Rabbitmq.Actions;
-using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Logging;
-using RabbitMQ.Client;
-using RabbitMQ.Client.Events;
-
-namespace CleanArchitecture.Domain.Rabbitmq;
-
-public sealed class RabbitMqHandler : BackgroundService
-{
- private readonly RabbitMqConfiguration _configuration;
-
- private readonly ConcurrentDictionary> _consumers = new();
-
- private readonly ILogger _logger;
-
- private readonly ConcurrentQueue _pendingActions = new();
- private IChannel? _channel;
-
- public RabbitMqHandler(
- RabbitMqConfiguration configuration,
- ILogger logger)
- {
- _configuration = configuration;
- _logger = logger;
- }
-
- public override async Task StartAsync(CancellationToken cancellationToken)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Connection will not be established");
- return;
- }
-
- _logger.LogInformation("Starting RabbitMQ connection");
-
- var factory = new ConnectionFactory
- {
- AutomaticRecoveryEnabled = true,
- HostName = _configuration.Host,
- Port = _configuration.Port,
- UserName = _configuration.Username,
- Password = _configuration.Password
- };
-
- var connection = await factory.CreateConnectionAsync(cancellationToken);
- _channel = await connection.CreateChannelAsync(null, cancellationToken);
-
- await base.StartAsync(cancellationToken);
- }
-
-
- public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.",
- exchangeName);
- return;
- }
-
- _pendingActions.Enqueue(new CreateExchange(exchangeName, type));
- }
-
- public void InitializeQueues(params string[] queueNames)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues.");
- return;
- }
-
- foreach (var queue in queueNames)
- {
- _pendingActions.Enqueue(new CreateQueue(queue));
- }
- }
-
- public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange.");
- return;
- }
-
- _pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey));
- }
-
- public void AddConsumer(string queueName, ConsumeEventHandler consumer)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer.");
- return;
- }
-
- // routingKey is set to queueName to mimic rabbitMQ
- _pendingActions.Enqueue(
- new RegisterConsumer(
- string.Empty,
- queueName,
- queueName,
- consumer,
- AddEventConsumer));
- }
-
- public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer.");
- return;
- }
-
- _pendingActions.Enqueue(
- new RegisterConsumer(
- exchange,
- queue,
- routingKey,
- consumer,
- AddEventConsumer));
- }
-
- public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer)
- {
- AddExchangeConsumer(exchange, string.Empty, queue, consumer);
- }
-
- private async Task AddEventConsumer(string exchange, string queueName, string routingKey,
- ConsumeEventHandler consumer)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Event consumer will not be added.");
- return;
- }
-
- var key = $"{exchange}-{routingKey}";
-
- if (!_consumers.TryGetValue(key, out var consumers))
- {
- consumers = new List();
- _consumers.TryAdd(key, consumers);
-
- var eventHandler = new AsyncEventingBasicConsumer(_channel!);
- eventHandler.ReceivedAsync += CallEventConsumersAsync;
-
- await _channel!.BasicConsumeAsync(queueName, false, eventHandler);
- }
-
- consumers.Add(consumer);
- }
-
- private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea)
- {
- var key = $"{ea.Exchange}-{ea.RoutingKey}";
-
- if (!_consumers.TryGetValue(key, out var consumers))
- {
- return;
- }
-
- foreach (var consumer in consumers)
- {
- try
- {
- await consumer(ea.Body);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error while handling event in queue {RoutingKey}", ea.RoutingKey);
- }
- }
-
- _pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag));
- }
-
-
- public void EnqueueMessage(string queueName, object message)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
- return;
- }
-
- _pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message));
- }
-
- public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "")
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
- return;
- }
-
- _pendingActions.Enqueue(new SendMessage(routingKey, exchange, message));
- }
-
-
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- if (!_configuration.Enabled)
- {
- _logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started");
- return;
- }
-
- while (true)
- {
- await HandleEnqueuedActions();
-
- await Task.Delay(1000, stoppingToken);
- }
- }
-
- private async Task HandleEnqueuedActions()
- {
- while (_pendingActions.TryDequeue(out var action))
- {
- try
- {
- await action.Perform(_channel!);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error while trying to send a rabbitmq message");
- _pendingActions.Enqueue(action);
- }
- }
- }
-}
\ No newline at end of file
diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs b/CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs
similarity index 80%
rename from CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs
rename to CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs
index b5771e4..1db8a17 100644
--- a/CleanArchitecture.Domain/Rabbitmq/RabbitMqConfiguration.cs
+++ b/CleanArchitecture.Domain/Settings/RabbitMqConfiguration.cs
@@ -1,10 +1,9 @@
-namespace CleanArchitecture.Domain.Rabbitmq;
+namespace CleanArchitecture.Domain.Settings;
public sealed class RabbitMqConfiguration
{
public string Host { get; set; } = string.Empty;
public int Port { get; set; }
- public bool Enabled { get; set; }
public string Username { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
diff --git a/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj b/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj
index e7d2b11..b71d224 100644
--- a/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj
+++ b/CleanArchitecture.Infrastructure/CleanArchitecture.Infrastructure.csproj
@@ -12,10 +12,10 @@
-
-
-
-
+
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj b/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj
index efe574e..e02b8b1 100644
--- a/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj
+++ b/CleanArchitecture.IntegrationTests/CleanArchitecture.IntegrationTests.csproj
@@ -13,8 +13,8 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
diff --git a/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs b/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs
index 8bce947..3c5bc34 100644
--- a/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs
+++ b/CleanArchitecture.IntegrationTests/GlobalSetupFixture.cs
@@ -66,7 +66,7 @@ internal class GlobalSetupFixture
catch (Exception ex)
{
// Creation of the respawner can fail if the database has not been created yet
- TestContext.WriteLine($"Failed to create respawner: {ex.Message}");
+ await TestContext.Out.WriteLineAsync($"Failed to create respawner: {ex.Message}");
}
}
diff --git a/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj b/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj
index 4ee4ccd..72b9e8b 100644
--- a/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj
+++ b/CleanArchitecture.Proto/CleanArchitecture.Proto.csproj
@@ -13,9 +13,9 @@
-
-
-
+
+
+
diff --git a/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj b/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj
index ff91ee2..7208282 100644
--- a/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj
+++ b/CleanArchitecture.ServiceDefaults/CleanArchitecture.ServiceDefaults.csproj
@@ -10,15 +10,15 @@
-
+
-
-
-
+
+
+
-
-
+
+
diff --git a/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj b/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj
index fcab850..077e614 100644
--- a/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj
+++ b/CleanArchitecture.Shared/CleanArchitecture.Shared.csproj
@@ -7,6 +7,7 @@
+
diff --git a/CleanArchitecture.Shared/Events/DomainEvent.cs b/CleanArchitecture.Shared/Events/DomainEvent.cs
index f541549..3b8938a 100644
--- a/CleanArchitecture.Shared/Events/DomainEvent.cs
+++ b/CleanArchitecture.Shared/Events/DomainEvent.cs
@@ -1,8 +1,10 @@
using System;
+using MassTransit;
using MediatR;
namespace CleanArchitecture.Shared.Events;
+[ExcludeFromTopology]
public abstract class DomainEvent : Message, INotification
{
public DateTime Timestamp { get; private set; }
diff --git a/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs b/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
new file mode 100644
index 0000000..baf73d5
--- /dev/null
+++ b/CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
@@ -0,0 +1,18 @@
+using System;
+
+namespace CleanArchitecture.Shared.Events;
+
+public class FanoutDomainEvent : DomainEvent
+{
+ public DomainEvent DomainEvent { get; }
+ public Guid? UserId { get; }
+
+ public FanoutDomainEvent(
+ Guid aggregateId,
+ DomainEvent domainEvent,
+ Guid? userId) : base(aggregateId)
+ {
+ DomainEvent = domainEvent;
+ UserId = userId;
+ }
+}