0
0
mirror of https://github.com/alex289/CleanArchitecture.git synced 2025-08-22 19:28:34 +00:00

Merge pull request #26 from alex289/feature/rabbitmq

feat: Add rabbitmq
This commit is contained in:
Alex 2023-09-02 18:00:15 +02:00 committed by GitHub
commit f8e9ad741c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
79 changed files with 740 additions and 142 deletions

View File

@ -15,8 +15,8 @@ namespace CleanArchitecture.Api.BackgroundServices;
public sealed class SetInactiveUsersService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<SetInactiveUsersService> _logger;
private readonly IServiceProvider _serviceProvider;
public SetInactiveUsersService(
IServiceProvider serviceProvider,

View File

@ -8,6 +8,7 @@
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.ApplicationStatus" Version="7.0.0"/>
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" Version="7.0.0"/>
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="7.0.0"/>
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="7.0.0"/>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="7.1.0"/>

View File

@ -3,6 +3,7 @@ using CleanArchitecture.Api.Extensions;
using CleanArchitecture.Application.Extensions;
using CleanArchitecture.Application.gRPC;
using CleanArchitecture.Domain.Extensions;
using CleanArchitecture.Domain.Rabbitmq.Extensions;
using CleanArchitecture.Infrastructure.Database;
using CleanArchitecture.Infrastructure.Extensions;
using HealthChecks.ApplicationStatus.DependencyInjection;
@ -14,7 +15,6 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
var builder = WebApplication.CreateBuilder(args);
@ -29,10 +29,17 @@ builder.Services
if (builder.Environment.IsProduction())
{
var rabbitHost = builder.Configuration["RabbitMQ:Host"];
var rabbitUser = builder.Configuration["RabbitMQ:Username"];
var rabbitPass = builder.Configuration["RabbitMQ:Password"];
builder.Services
.AddHealthChecks()
.AddSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")!)
.AddRedis(builder.Configuration["RedisHostName"]!, "Redis");
.AddRedis(builder.Configuration["RedisHostName"]!, "Redis")
.AddRabbitMQ(
rabbitConnectionString: $"amqp://{rabbitUser}:{rabbitPass}@{rabbitHost}",
name: "RabbitMQ");
}
builder.Services.AddDbContext<ApplicationDbContext>(options =>
@ -51,6 +58,8 @@ builder.Services.AddCommandHandlers();
builder.Services.AddNotificationHandlers();
builder.Services.AddApiUser();
builder.Services.AddRabbitMqHandler(builder.Configuration, "RabbitMQ");
builder.Services.AddHostedService<SetInactiveUsersService>();
builder.Services.AddMediatR(cfg => { cfg.RegisterServicesFromAssemblies(typeof(Program).Assembly); });

View File

@ -12,5 +12,11 @@
"Issuer": "CleanArchitectureServer",
"Audience": "CleanArchitectureClient",
"Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*"
},
"RabbitMQ": {
"Host": "localhost",
"Username": "guest",
"Password": "guest",
"Enabled": "True"
}
}

View File

@ -0,0 +1,14 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"RabbitMQ": {
"Host": "localhost",
"Username": "guest",
"Password": "guest",
"Enabled": "False"
}
}

View File

@ -14,5 +14,11 @@
"Audience": "CleanArchitectureClient",
"Secret": "sD3v061gf8BxXgmxcHssasjdlkasjd87439284)@#(*"
},
"RedisHostName": "redis"
"RedisHostName": "redis",
"RabbitMQ": {
"Host": "rabbitmq",
"Username": "admin",
"Password": "DOIA9234JF",
"Enabled": "True"
}
}

View File

@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CleanArchitecture.Application.ViewModels;
using CleanArchitecture.Application.ViewModels.Users;

View File

@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CleanArchitecture.Application.Interfaces;
using CleanArchitecture.Application.Queries.Tenants.GetAll;

View File

@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CleanArchitecture.Application.Interfaces;
using CleanArchitecture.Application.Queries.Users.GetAll;

View File

@ -4,14 +4,15 @@ namespace CleanArchitecture.Application.ViewModels;
public sealed class PageQuery
{
private int _page = 1;
private int _pageSize = 10;
public int PageSize
{
get => _pageSize;
set => _pageSize = Math.Max(0, value);
}
private int _page = 1;
public int Page
{
get => _page;

View File

@ -23,7 +23,6 @@ public sealed class PagedResult<T>
// used by json deserializer
private PagedResult()
{
}
public static PagedResult<T> Empty()

View File

@ -1,7 +1,7 @@
using System;
using CleanArchitecture.Domain.Commands.Tenants.CreateTenant;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Shared.Events.Tenant;
using Xunit;
namespace CleanArchitecture.Domain.Tests.CommandHandler.Tenant.CreateTenant;

View File

@ -1,7 +1,7 @@
using System;
using CleanArchitecture.Domain.Commands.Tenants.DeleteTenant;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Shared.Events.Tenant;
using Xunit;
namespace CleanArchitecture.Domain.Tests.CommandHandler.Tenant.DeleteTenant;

View File

@ -1,7 +1,7 @@
using System;
using CleanArchitecture.Domain.Commands.Tenants.UpdateTenant;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Shared.Events.Tenant;
using Xunit;
namespace CleanArchitecture.Domain.Tests.CommandHandler.Tenant.UpdateTenant;

View File

@ -1,7 +1,7 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands.Users.ChangePassword;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Shared.Events.User;
using Xunit;
namespace CleanArchitecture.Domain.Tests.CommandHandler.User.ChangePassword;

View File

@ -2,7 +2,7 @@ using System;
using CleanArchitecture.Domain.Commands.Users.CreateUser;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Shared.Events.User;
using NSubstitute;
using Xunit;

View File

@ -1,7 +1,7 @@
using System;
using CleanArchitecture.Domain.Commands.Users.DeleteUser;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Shared.Events.User;
using Xunit;
namespace CleanArchitecture.Domain.Tests.CommandHandler.User.DeleteUser;

View File

@ -3,7 +3,7 @@ using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands.Users.UpdateUser;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Shared.Events.User;
using NSubstitute;
using Xunit;

View File

@ -1,9 +1,9 @@
using System;
using System.Linq.Expressions;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events;
using NSubstitute;
namespace CleanArchitecture.Domain.Tests;

View File

@ -5,9 +5,13 @@ namespace CleanArchitecture.Domain;
public static class CacheKeyGenerator
{
public static string GetEntityCacheKey<TEntity>(TEntity entity) where TEntity : Entity =>
$"{typeof(TEntity)}-{entity.Id}";
public static string GetEntityCacheKey<TEntity>(Guid id) where TEntity : Entity =>
$"{typeof(TEntity)}-{id}";
public static string GetEntityCacheKey<TEntity>(TEntity entity) where TEntity : Entity
{
return $"{typeof(TEntity)}-{entity.Id}";
}
public static string GetEntityCacheKey<TEntity>(Guid id) where TEntity : Entity
{
return $"{typeof(TEntity)}-{id}";
}
}

View File

@ -11,10 +11,15 @@
<PackageReference Include="MediatR" Version="12.1.1"/>
<PackageReference Include="Microsoft.Extensions.Options" Version="7.0.1"/>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3"/>
<PackageReference Include="RabbitMQ.Client" Version="6.5.0"/>
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.32.1"/>
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CleanArchitecture.Shared\CleanArchitecture.Shared.csproj"/>
</ItemGroup>
</Project>

View File

@ -3,10 +3,10 @@ using System.Threading.Tasks;
using CleanArchitecture.Domain.Entities;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.Tenant;
using MediatR;
namespace CleanArchitecture.Domain.Commands.Tenants.CreateTenant;

View File

@ -3,10 +3,10 @@ using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.Tenant;
using MediatR;
namespace CleanArchitecture.Domain.Commands.Tenants.DeleteTenant;

View File

@ -2,10 +2,10 @@ using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.Tenant;
using MediatR;
namespace CleanArchitecture.Domain.Commands.Tenants.UpdateTenant;

View File

@ -1,10 +1,10 @@
using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.User;
using MediatR;
using BC = BCrypt.Net.BCrypt;

View File

@ -3,10 +3,10 @@ using System.Threading.Tasks;
using CleanArchitecture.Domain.Entities;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.User;
using MediatR;
using BC = BCrypt.Net.BCrypt;

View File

@ -2,10 +2,10 @@ using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.User;
using MediatR;
namespace CleanArchitecture.Domain.Commands.Users.DeleteUser;

View File

@ -2,10 +2,10 @@ using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Enums;
using CleanArchitecture.Domain.Errors;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Domain.Interfaces.Repositories;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.User;
using MediatR;
namespace CleanArchitecture.Domain.Commands.Users.UpdateUser;

View File

@ -0,0 +1,6 @@
namespace CleanArchitecture.Domain.Constants;
public sealed class Messaging
{
public const string ExchangeNameNotifications = "exchange-notifications";
}

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using CleanArchitecture.Shared.Events;
namespace CleanArchitecture.Domain.DomainEvents;

View File

@ -1,4 +1,5 @@
using System;
using CleanArchitecture.Shared.Events;
namespace CleanArchitecture.Domain.DomainEvents;

View File

@ -0,0 +1,27 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.Constants;
using CleanArchitecture.Domain.Rabbitmq;
using CleanArchitecture.Shared.Events;
namespace CleanArchitecture.Domain.EventHandler.Fanout;
public sealed class FanoutEventHandler : IFanoutEventHandler
{
private readonly RabbitMqHandler _rabbitMqHandler;
public FanoutEventHandler(
RabbitMqHandler rabbitMqHandler)
{
_rabbitMqHandler = rabbitMqHandler;
_rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications);
}
public Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event)
{
_rabbitMqHandler.EnqueueExchangeMessage(
Messaging.ExchangeNameNotifications,
@event);
return Task.FromResult(@event);
}
}

View File

@ -0,0 +1,9 @@
using System.Threading.Tasks;
using CleanArchitecture.Shared.Events;
namespace CleanArchitecture.Domain.EventHandler.Fanout;
public interface IFanoutEventHandler
{
Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event);
}

View File

@ -1,7 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Entities;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Shared.Events.Tenant;
using MediatR;
using Microsoft.Extensions.Caching.Distributed;

View File

@ -1,7 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Entities;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Shared.Events.User;
using MediatR;
using Microsoft.Extensions.Caching.Distributed;

View File

@ -7,9 +7,10 @@ using CleanArchitecture.Domain.Commands.Users.DeleteUser;
using CleanArchitecture.Domain.Commands.Users.LoginUser;
using CleanArchitecture.Domain.Commands.Users.UpdateUser;
using CleanArchitecture.Domain.EventHandler;
using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Shared.Events.Tenant;
using CleanArchitecture.Shared.Events.User;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
@ -36,6 +37,9 @@ public static class ServiceCollectionExtension
public static IServiceCollection AddNotificationHandlers(this IServiceCollection services)
{
// Fanout
services.AddScoped<IFanoutEventHandler, FanoutEventHandler>();
// User
services.AddScoped<INotificationHandler<UserCreatedEvent>, UserEventHandler>();
services.AddScoped<INotificationHandler<UserUpdatedEvent>, UserEventHandler>();

View File

@ -1,6 +1,6 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Shared.Events;
using MediatR;
namespace CleanArchitecture.Domain.Interfaces;

View File

@ -1,5 +1,5 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Shared.Events;
namespace CleanArchitecture.Domain.Notifications;

View File

@ -0,0 +1,22 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class BindQueueToExchange : IRabbitMqAction
{
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routingKey;
public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
{
_exchangeName = exchangeName;
_routingKey = routingKey;
_queueName = queueName;
}
public void Perform(IModel channel)
{
channel.QueueBind(_queueName, _exchangeName, _routingKey);
}
}

View File

@ -0,0 +1,20 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class CreateExchange : IRabbitMqAction
{
private readonly string _name;
private readonly string _type;
public CreateExchange(string name, string type)
{
_name = name;
_type = type;
}
public void Perform(IModel channel)
{
channel.ExchangeDeclare(_name, _type);
}
}

View File

@ -0,0 +1,23 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class CreateQueue : IRabbitMqAction
{
public string QueueName { get; }
public CreateQueue(string queueName)
{
QueueName = queueName;
}
public void Perform(IModel channel)
{
channel.QueueDeclare(
QueueName,
false,
false,
false,
null);
}
}

View File

@ -0,0 +1,8 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public interface IRabbitMqAction
{
void Perform(IModel channel);
}

View File

@ -0,0 +1,32 @@
using System;
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class RegisterConsumer : IRabbitMqAction
{
private readonly Action<string, string, string, ConsumeEventHandler> _addConsumer;
private readonly ConsumeEventHandler _consumer;
private readonly string _exchange;
private readonly string _queue;
private readonly string _routingKey;
public RegisterConsumer(
string exchange,
string queue,
string routingKey,
ConsumeEventHandler consumer,
Action<string, string, string, ConsumeEventHandler> addConsumer)
{
_exchange = exchange;
_queue = queue;
_routingKey = routingKey;
_consumer = consumer;
_addConsumer = addConsumer;
}
public void Perform(IModel channel)
{
_addConsumer(_exchange, _queue, _routingKey, _consumer);
}
}

View File

@ -0,0 +1,18 @@
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class SendAcknowledgement : IRabbitMqAction
{
public ulong DeliveryTag { get; }
public SendAcknowledgement(ulong deliveryTag)
{
DeliveryTag = deliveryTag;
}
public void Perform(IModel channel)
{
channel.BasicAck(DeliveryTag, false);
}
}

View File

@ -0,0 +1,38 @@
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
public sealed class SendMessage : IRabbitMqAction
{
private static readonly JsonSerializerSettings s_serializerSettings =
new() { TypeNameHandling = TypeNameHandling.Objects };
private readonly string _exchange;
private readonly object _message;
private readonly string _routingKey;
/// <param name="routingKey">If exchange is empty, this is the name of the queue</param>
public SendMessage(string routingKey, string exchange, object message)
{
_routingKey = routingKey;
_exchange = exchange;
_message = message;
}
public void Perform(IModel channel)
{
var json = JsonConvert.SerializeObject(_message, s_serializerSettings);
var content = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(
_exchange,
_routingKey,
null,
content);
}
}

View File

@ -0,0 +1,6 @@
using System;
using System.Threading.Tasks;
namespace CleanArchitecture.Domain.Rabbitmq;
public delegate Task<bool> ConsumeEventHandler(ReadOnlyMemory<byte> content);

View File

@ -0,0 +1,22 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace CleanArchitecture.Domain.Rabbitmq.Extensions;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddRabbitMqHandler(
this IServiceCollection services,
IConfiguration configuration,
string rabbitMqConfigSection)
{
var rabbitMq = new RabbitMqConfiguration();
configuration.Bind(rabbitMqConfigSection, rabbitMq);
services.AddSingleton(rabbitMq);
services.AddSingleton<RabbitMqHandler>();
services.AddHostedService(serviceProvider => serviceProvider.GetService<RabbitMqHandler>()!);
return services;
}
}

View File

@ -0,0 +1,9 @@
namespace CleanArchitecture.Domain.Rabbitmq;
public sealed class RabbitMqConfiguration
{
public string Host { get; set; } = string.Empty;
public bool Enabled { get; set; }
public string Username { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
}

View File

@ -0,0 +1,224 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Rabbitmq.Actions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace CleanArchitecture.Domain.Rabbitmq;
public sealed class RabbitMqHandler : BackgroundService
{
private readonly IModel? _channel;
private readonly RabbitMqConfiguration _configuration;
private readonly ConcurrentDictionary<string, List<ConsumeEventHandler>> _consumers = new();
private readonly ILogger<RabbitMqHandler> _logger;
private readonly ConcurrentQueue<IRabbitMqAction> _pendingActions = new();
public RabbitMqHandler(
RabbitMqConfiguration configuration,
ILogger<RabbitMqHandler> logger)
{
_configuration = configuration;
_logger = logger;
if (!configuration.Enabled)
{
logger.LogInformation("RabbitMQ is disabled. Connection will not be established");
return;
}
var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = configuration.Host,
UserName = configuration.Username,
Password = configuration.Password,
DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
}
public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout)
{
if (!_configuration.Enabled)
{
_logger.LogInformation($"RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.");
return;
}
_pendingActions.Enqueue(new CreateExchange(exchangeName, type));
}
public void InitializeQueues(params string[] queueNames)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues.");
return;
}
foreach (var queue in queueNames)
{
_pendingActions.Enqueue(new CreateQueue(queue));
}
}
public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange.");
return;
}
_pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey));
}
public void AddConsumer(string queueName, ConsumeEventHandler consumer)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer.");
return;
}
// routingKey is set to queueName to mimic rabbitMQ
_pendingActions.Enqueue(
new RegisterConsumer(
string.Empty,
queueName,
queueName,
consumer,
AddEventConsumer));
}
public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer.");
return;
}
_pendingActions.Enqueue(
new RegisterConsumer(
exchange,
queue,
routingKey,
consumer,
AddEventConsumer));
}
public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer)
{
AddExchangeConsumer(exchange, string.Empty, queue, consumer);
}
private void AddEventConsumer(string exchange, string queueName, string routingKey, ConsumeEventHandler consumer)
{
var key = $"{exchange}-{routingKey}";
if (!_consumers.TryGetValue(key, out var consumers))
{
consumers = new List<ConsumeEventHandler>();
_consumers.TryAdd(key, consumers);
var eventHandler = new AsyncEventingBasicConsumer(_channel);
eventHandler.Received += CallEventConsumersAsync;
_channel!.BasicConsume(queueName, false, eventHandler);
}
consumers.Add(consumer);
}
private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea)
{
var key = $"{ea.Exchange}-{ea.RoutingKey}";
if (!_consumers.TryGetValue(key, out var consumers))
{
return;
}
foreach (var consumer in consumers)
{
try
{
await consumer(ea.Body);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error while handling event in queue {ea.RoutingKey}");
}
}
_pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag));
}
public void EnqueueMessage(string queueName, object message)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
return;
}
_pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message));
}
public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "")
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
return;
}
_pendingActions.Enqueue(new SendMessage(routingKey, exchange, message));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_configuration.Enabled)
{
_logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started");
return;
}
while (true)
{
HandleEnqueuedActions();
await Task.Delay(1000, stoppingToken);
}
}
private void HandleEnqueuedActions()
{
while (_pendingActions.TryDequeue(out var action))
{
try
{
action.Perform(_channel!);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while trying to send a rabbitmq message");
_pendingActions.Enqueue(action);
}
}
}
}

View File

@ -2,8 +2,9 @@ using System;
using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands.Users.DeleteUser;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Shared.Events.User;
using MediatR;
using NSubstitute;
using Xunit;
@ -17,8 +18,9 @@ public sealed class InMemoryBusTests
{
var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore);
var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
const string key = "Key";
const string value = "Value";
@ -36,8 +38,9 @@ public sealed class InMemoryBusTests
{
var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore);
var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
var userDeletedEvent = new UserDeletedEvent(Guid.NewGuid(), Guid.NewGuid());
@ -51,8 +54,9 @@ public sealed class InMemoryBusTests
{
var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore);
var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
var deleteUserCommand = new DeleteUserCommand(Guid.NewGuid());

View File

@ -7,6 +7,7 @@
<ItemGroup>
<ProjectReference Include="..\CleanArchitecture.Domain\CleanArchitecture.Domain.csproj"/>
<ProjectReference Include="..\CleanArchitecture.Shared\CleanArchitecture.Shared.csproj"/>
</ItemGroup>
<ItemGroup>

View File

@ -3,6 +3,7 @@ using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.DomainNotifications;
using CleanArchitecture.Domain.Notifications;
using CleanArchitecture.Infrastructure.Database;
using CleanArchitecture.Shared.Events;
using Newtonsoft.Json;
namespace CleanArchitecture.Infrastructure.EventSourcing;

View File

@ -1,7 +1,9 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Interfaces;
using CleanArchitecture.Shared.Events;
using MediatR;
namespace CleanArchitecture.Infrastructure;
@ -9,14 +11,17 @@ namespace CleanArchitecture.Infrastructure;
public sealed class InMemoryBus : IMediatorHandler
{
private readonly IDomainEventStore _domainEventStore;
private readonly IFanoutEventHandler _fanoutEventHandler;
private readonly IMediator _mediator;
public InMemoryBus(
IMediator mediator,
IDomainEventStore domainEventStore)
IDomainEventStore domainEventStore,
IFanoutEventHandler fanoutEventHandler)
{
_mediator = mediator;
_domainEventStore = domainEventStore;
_fanoutEventHandler = fanoutEventHandler;
}
public Task<TResponse> QueryAsync<TResponse>(IRequest<TResponse> query)
@ -29,6 +34,8 @@ public sealed class InMemoryBus : IMediatorHandler
await _domainEventStore.SaveAsync(@event);
await _mediator.Publish(@event);
await _fanoutEventHandler.HandleDomainEventAsync(@event);
}
public Task SendCommandAsync<T>(T command) where T : CommandBase

View File

@ -6,4 +6,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MediatR" Version="12.1.1"/>
</ItemGroup>
</Project>

View File

@ -1,7 +1,7 @@
using System;
using MediatR;
namespace CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Shared.Events;
public abstract class DomainEvent : Message, INotification
{

View File

@ -1,7 +1,7 @@
using System;
using MediatR;
namespace CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Shared.Events;
public abstract class Message : IRequest
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.Tenant;
namespace CleanArchitecture.Shared.Events.Tenant;
public sealed class TenantCreatedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.Tenant;
namespace CleanArchitecture.Shared.Events.Tenant;
public sealed class TenantDeletedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.Tenant;
namespace CleanArchitecture.Shared.Events.Tenant;
public sealed class TenantUpdatedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.User;
namespace CleanArchitecture.Shared.Events.User;
public sealed class PasswordChangedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.User;
namespace CleanArchitecture.Shared.Events.User;
public sealed class UserCreatedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.User;
namespace CleanArchitecture.Shared.Events.User;
public sealed class UserDeletedEvent : DomainEvent
{

View File

@ -1,7 +1,6 @@
using System;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.Events.User;
namespace CleanArchitecture.Shared.Events.User;
public sealed class UserUpdatedEvent : DomainEvent
{

View File

@ -49,6 +49,7 @@ options.ConfigurationOptions = new ConfigurationOptions
EndPoints = { "localhost", "6379" }
};
```
3. RabbitMq: `docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management`
Running the container
1. Build the Dockerfile: `docker build -t clean-architecture .`
@ -64,7 +65,8 @@ Running the container
1. Change the ConnectionString in the appsettings.json to `Server=clean-architecture-db-service;Database=clean-architecture;Trusted_Connection=False;MultipleActiveResultSets=true;TrustServerCertificate=True;User Id=SA;Password=Password123!#`
2. Change the RedisHostName in the appsettings.json to `redis-service`
3. Build the docker image and push it to the docker hub (Change the image name in the `k8s-deployment.yml` to your own)
3. Change the RabbitMQ Host in the appsettings.json to `rabbitmq-service`
4. Build the docker image and push it to the docker hub (Change the image name in the `k8s-deployment.yml` to your own)
Apply the deployment file: `kubectl apply -f k8s-deployment.yml` (Delete: `kubectl delete -f k8s-deployment.yml`)

View File

@ -7,7 +7,10 @@ services:
ports:
- 80:80
depends_on:
- db
db:
condition: service_started
rabbitmq:
condition: service_healthy
links:
- db
healthcheck:
@ -15,6 +18,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
db:
image: mcr.microsoft.com/mssql/server
environment:
@ -22,6 +26,7 @@ services:
- SA_PASSWORD=Password123!#
ports:
- 1433:1433
redis:
image: docker.io/bitnami/redis:7.2
environment:
@ -32,6 +37,24 @@ services:
- '6379:6379'
volumes:
- 'redis_data:/bitnami/redis/data'
rabbitmq:
image: "rabbitmq:3-management"
ports:
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=DOIA9234JF
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 3s
retries: 3
volumes:
rabbitmq_data:
redis_data:
driver: local

View File

@ -118,3 +118,54 @@ spec:
- protocol: TCP
port: 6379
targetPort: 6379
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq-deployment
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3-management
ports:
- containerPort: 5672
- containerPort: 15672
env:
- name: RABBITMQ_DEFAULT_USER
value: admin
- name: RABBITMQ_DEFAULT_PASS
value: DOIA9234JF
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
volumes:
- name: rabbitmq-data
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-service
spec:
selector:
app: rabbitmq
ports:
- name: rabbitmq-port
protocol: TCP
port: 5672
targetPort: 5672
- name: rabbitmq-management-port
protocol: TCP
port: 15672
targetPort: 15672