0
0
mirror of https://github.com/alex289/CleanArchitecture.git synced 2025-06-30 02:31:08 +00:00

feat: Add rabbitmq

This commit is contained in:
alex289 2023-09-02 10:56:20 +02:00
parent 6d4b000bd0
commit 4746bfbef1
No known key found for this signature in database
GPG Key ID: 573F77CD2D87F863
16 changed files with 457 additions and 1 deletions

View File

@ -3,6 +3,7 @@ using CleanArchitecture.Api.Extensions;
using CleanArchitecture.Application.Extensions;
using CleanArchitecture.Application.gRPC;
using CleanArchitecture.Domain.Extensions;
using CleanArchitecture.Domain.Rabbitmq.Extensions;
using CleanArchitecture.Infrastructure.Database;
using CleanArchitecture.Infrastructure.Extensions;
using HealthChecks.ApplicationStatus.DependencyInjection;
@ -51,6 +52,8 @@ builder.Services.AddCommandHandlers();
builder.Services.AddNotificationHandlers();
builder.Services.AddApiUser();
builder.Services.AddRabbitMqHandler(builder.Configuration, "RabbitMQ");
builder.Services.AddHostedService<SetInactiveUsersService>();
builder.Services.AddMediatR(cfg => { cfg.RegisterServicesFromAssemblies(typeof(Program).Assembly); });

View File

@ -12,5 +12,11 @@
"Issuer": "CleanArchitectureServer",
"Audience": "CleanArchitectureClient",
"Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*"
},
"RabbitMQ": {
"Host": "localhost",
"Username": "guest",
"Password": "guest",
"Enabled": "True"
}
}

View File

@ -0,0 +1,14 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"RabbitMQ": {
"Host": "localhost",
"Username": "guest",
"Password": "guest",
"Enabled": "False"
}
}

View File

@ -14,5 +14,11 @@
"Audience": "CleanArchitectureClient",
"Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*"
},
"RedisHostName": "redis"
"RedisHostName": "redis",
"RabbitMQ": {
"Host": "rabbitmq",
"Username": "guest",
"Password": "guest",
"Enabled": "True"
}
}

View File

@ -11,6 +11,7 @@
<PackageReference Include="MediatR" Version="12.1.1"/>
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.1"/>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="6.5.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.32.1"/>
</ItemGroup>

View File

@ -0,0 +1,22 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class BindQueueToExchange : IRabbitMqAction
{
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routingKey;
public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
{
_exchangeName = exchangeName;
_routingKey = routingKey;
_queueName = queueName;
}
public void Perform(IModel channel)
{
channel.QueueBind(_queueName, _exchangeName, _routingKey);
}
}

View File

@ -0,0 +1,20 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class CreateExchange : IRabbitMqAction
{
private readonly string _name;
private readonly string _type;
public CreateExchange(string name, string type)
{
_name = name;
_type = type;
}
public void Perform(IModel channel)
{
channel.ExchangeDeclare(_name, _type);
}
}

View File

@ -0,0 +1,23 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class CreateQueue : IRabbitMqAction
{
public string QueueName { get; }
public CreateQueue(string queueName)
{
QueueName = queueName;
}
public void Perform(IModel channel)
{
channel.QueueDeclare(
QueueName,
false,
false,
false,
null);
}
}

View File

@ -0,0 +1,8 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public interface IRabbitMqAction
{
void Perform(IModel channel);
}

View File

@ -0,0 +1,32 @@
using System;
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class RegisterConsumer : IRabbitMqAction
{
private readonly Action<string, string, string, ConsumeEventHandler> _addConsumer;
private readonly ConsumeEventHandler _consumer;
private readonly string _exchange;
private readonly string _queue;
private readonly string _routingKey;
public RegisterConsumer(
string exchange,
string queue,
string routingKey,
ConsumeEventHandler consumer,
Action<string, string, string, ConsumeEventHandler> addConsumer)
{
_exchange = exchange;
_queue = queue;
_routingKey = routingKey;
_consumer = consumer;
_addConsumer = addConsumer;
}
public void Perform(IModel channel)
{
_addConsumer(_exchange, _queue, _routingKey, _consumer);
}
}

View File

@ -0,0 +1,18 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class SendAcknowledgement : IRabbitMqAction
{
public ulong DeliveryTag { get; }
public SendAcknowledgement(ulong deliveryTag)
{
DeliveryTag = deliveryTag;
}
public void Perform(IModel channel)
{
channel.BasicAck(DeliveryTag, false);
}
}

View File

@ -0,0 +1,38 @@
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class SendMessage : IRabbitMqAction
{
private static readonly JsonSerializerSettings s_serializerSettings =
new() { TypeNameHandling = TypeNameHandling.Objects };
private readonly string _exchange;
private readonly object _message;
private readonly string _routingKey;
/// <param name="routingKey">If exchange is empty, this is the name of the queue</param>
public SendMessage(string routingKey, string exchange, object message)
{
_routingKey = routingKey;
_exchange = exchange;
_message = message;
}
public void Perform(IModel channel)
{
var json = JsonConvert.SerializeObject(_message, s_serializerSettings);
var content = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(
_exchange,
_routingKey,
null,
content);
}
}

View File

@ -0,0 +1,6 @@
using System;
using System.Threading.Tasks;
namespace CleanArchitecture.Domain.Rabbitmq;
public delegate Task<bool> ConsumeEventHandler(ReadOnlyMemory<byte> content);

View File

@ -0,0 +1,22 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace CleanArchitecture.Domain.Rabbitmq.Extensions;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddRabbitMqHandler(
this IServiceCollection services,
IConfiguration configuration,
string rabbitMqConfigSection)
{
var rabbitMq = new RabbitMqConfiguration();
configuration.Bind(rabbitMqConfigSection, rabbitMq);
services.AddSingleton(rabbitMq);
services.AddSingleton<RabbitMqHandler>();
services.AddHostedService(serviceProvider => serviceProvider.GetService<RabbitMqHandler>()!);
return services;
}
}

View File

@ -0,0 +1,9 @@
namespace CleanArchitecture.Domain.Rabbitmq;
public sealed class RabbitMqConfiguration
{
public string Host { get; set; } = string.Empty;
public bool Enabled { get; set; }
public string Username { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
}

View File

@ -0,0 +1,228 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Rabbitmq.Actions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace CleanArchitecture.Domain.Rabbitmq;
public sealed class RabbitMqHandler : BackgroundService
{
private readonly RabbitMqConfiguration _configuration;
private readonly ConcurrentDictionary<string, List<ConsumeEventHandler>> _consumers = new();
private readonly ILogger<RabbitMqHandler> _logger;
private readonly ConcurrentQueue<IRabbitMqAction> _pendingActions = new();
private readonly IModel? _channel;
private readonly IConnection? _connection;
public RabbitMqHandler(
RabbitMqConfiguration configuration,
ILogger<RabbitMqHandler> logger)
{
_configuration = configuration;
_logger = logger;
if (!configuration.Enabled)
{
logger.LogInformation("RabbitMQ is disabled. Connection will not be established");
return;
}
var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = configuration.Host,
UserName = configuration.Username,
Password = configuration.Password,
DispatchConsumersAsync = true
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout)
{
if (!_configuration.Enabled)
{
_logger.LogInformation($"RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.");
return;
}
_pendingActions.Enqueue(new CreateExchange(exchangeName, type));
}
public void InitializeQueues(params string[] queueNames)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues.");
return;
}
foreach (var queue in queueNames)
{
_pendingActions.Enqueue(new CreateQueue(queue));
}
}
public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange.");
return;
}
_pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey));
}
public void AddConsumer(string queueName, ConsumeEventHandler consumer)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer.");
return;
}
// routingKey is set to queueName to mimic rabbitMQ
_pendingActions.Enqueue(
new RegisterConsumer(
string.Empty,
queueName,
queueName,
consumer,
AddEventConsumer));
}
public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer.");
return;
}
_pendingActions.Enqueue(
new RegisterConsumer(
exchange,
queue,
routingKey,
consumer,
AddEventConsumer));
}
public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer)
{
AddExchangeConsumer(exchange, string.Empty, queue, consumer);
}
private void AddEventConsumer(string exchange, string queueName, string routingKey, ConsumeEventHandler consumer)
{
var key = $"{exchange}-{routingKey}";
if (!_consumers.TryGetValue(key, out var consumers))
{
consumers = new List<ConsumeEventHandler>();
_consumers.TryAdd(key, consumers);
var eventHandler = new AsyncEventingBasicConsumer(_channel);
eventHandler.Received += CallEventConsumersAsync;
_channel!.BasicConsume(queueName, false, eventHandler);
}
consumers.Add(consumer);
}
private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea)
{
var key = $"{ea.Exchange}-{ea.RoutingKey}";
if (!_consumers.TryGetValue(key, out var consumers))
{
return;
}
foreach (var consumer in consumers)
{
try
{
await consumer(ea.Body);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error while handling event in queue {ea.RoutingKey}");
}
}
_pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag));
}
public void EnqueueMessage(string queueName, object message)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
return;
}
_pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message));
}
public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "")
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
return;
}
_pendingActions.Enqueue(new SendMessage(routingKey, exchange, message));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started");
return;
}
while (true)
{
HandleEnqueuedActions();
await Task.Delay(1000, stoppingToken);
}
}
private void HandleEnqueuedActions()
{
while (_pendingActions.TryDequeue(out var action))
{
try
{
action.Perform(_channel!);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while trying to send a rabbitmq message");
_pendingActions.Enqueue(action);
}
}
}
}