mirror of
https://github.com/alex289/CleanArchitecture.git
synced 2025-06-29 18:21:08 +00:00
feat: MassTransit
This commit is contained in:
parent
67c1de4330
commit
74549806d6
@ -14,16 +14,18 @@
|
|||||||
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.SqlServer" Version="9.0.0" />
|
||||||
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="9.0.0" />
|
||||||
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.67.0" />
|
<PackageReference Include="Grpc.AspNetCore.Server.Reflection" Version="2.70.0" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.2" />
|
<PackageReference Include="MassTransit.Newtonsoft" Version="8.3.7" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2" />
|
<PackageReference Include="MassTransit.RabbitMQ" Version="8.3.7" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.2">
|
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.3" />
|
||||||
|
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.3" />
|
||||||
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
|
||||||
<PrivateAssets>all</PrivateAssets>
|
<PrivateAssets>all</PrivateAssets>
|
||||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||||
</PackageReference>
|
</PackageReference>
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.2" />
|
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.2" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.2" />
|
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.EntityFrameworkCore" Version="9.0.3" />
|
||||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
|
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
|
||||||
<PackageReference Include="Swashbuckle.AspNetCore.Annotations" Version="7.3.1" />
|
<PackageReference Include="Swashbuckle.AspNetCore.Annotations" Version="7.3.1" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
using CleanArchitecture.Domain.Rabbitmq;
|
using CleanArchitecture.Domain.Settings;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
|
|
||||||
namespace CleanArchitecture.Api.Extensions;
|
namespace CleanArchitecture.Api.Extensions;
|
||||||
@ -11,7 +11,6 @@ public static class ConfigurationExtensions
|
|||||||
{
|
{
|
||||||
var isAspire = configuration["ASPIRE_ENABLED"] == "true";
|
var isAspire = configuration["ASPIRE_ENABLED"] == "true";
|
||||||
|
|
||||||
var rabbitEnabled = configuration["RabbitMQ:Enabled"];
|
|
||||||
var rabbitHost = configuration["RabbitMQ:Host"];
|
var rabbitHost = configuration["RabbitMQ:Host"];
|
||||||
var rabbitPort = configuration["RabbitMQ:Port"];
|
var rabbitPort = configuration["RabbitMQ:Port"];
|
||||||
var rabbitUser = configuration["RabbitMQ:Username"];
|
var rabbitUser = configuration["RabbitMQ:Username"];
|
||||||
@ -19,7 +18,6 @@ public static class ConfigurationExtensions
|
|||||||
|
|
||||||
if (isAspire)
|
if (isAspire)
|
||||||
{
|
{
|
||||||
rabbitEnabled = "true";
|
|
||||||
var connectionString = configuration["ConnectionStrings:RabbitMq"];
|
var connectionString = configuration["ConnectionStrings:RabbitMq"];
|
||||||
|
|
||||||
var rabbitUri = new Uri(connectionString!);
|
var rabbitUri = new Uri(connectionString!);
|
||||||
@ -33,7 +31,6 @@ public static class ConfigurationExtensions
|
|||||||
{
|
{
|
||||||
Host = rabbitHost ?? "",
|
Host = rabbitHost ?? "",
|
||||||
Port = int.Parse(rabbitPort ?? "0"),
|
Port = int.Parse(rabbitPort ?? "0"),
|
||||||
Enabled = bool.Parse(rabbitEnabled ?? "false"),
|
|
||||||
Username = rabbitUser ?? "",
|
Username = rabbitUser ?? "",
|
||||||
Password = rabbitPass ?? ""
|
Password = rabbitPass ?? ""
|
||||||
};
|
};
|
||||||
|
@ -4,19 +4,21 @@ using CleanArchitecture.Api.BackgroundServices;
|
|||||||
using CleanArchitecture.Api.Extensions;
|
using CleanArchitecture.Api.Extensions;
|
||||||
using CleanArchitecture.Application.Extensions;
|
using CleanArchitecture.Application.Extensions;
|
||||||
using CleanArchitecture.Application.gRPC;
|
using CleanArchitecture.Application.gRPC;
|
||||||
|
using CleanArchitecture.Domain.Consumers;
|
||||||
using CleanArchitecture.Domain.Extensions;
|
using CleanArchitecture.Domain.Extensions;
|
||||||
using CleanArchitecture.Domain.Rabbitmq.Extensions;
|
|
||||||
using CleanArchitecture.Infrastructure.Database;
|
using CleanArchitecture.Infrastructure.Database;
|
||||||
using CleanArchitecture.Infrastructure.Extensions;
|
using CleanArchitecture.Infrastructure.Extensions;
|
||||||
using CleanArchitecture.ServiceDefaults;
|
using CleanArchitecture.ServiceDefaults;
|
||||||
using HealthChecks.ApplicationStatus.DependencyInjection;
|
using HealthChecks.ApplicationStatus.DependencyInjection;
|
||||||
using HealthChecks.UI.Client;
|
using HealthChecks.UI.Client;
|
||||||
|
using MassTransit;
|
||||||
using Microsoft.AspNetCore.Builder;
|
using Microsoft.AspNetCore.Builder;
|
||||||
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Newtonsoft.Json;
|
||||||
using RabbitMQ.Client;
|
using RabbitMQ.Client;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
@ -82,7 +84,48 @@ builder.Services.AddCommandHandlers();
|
|||||||
builder.Services.AddNotificationHandlers();
|
builder.Services.AddNotificationHandlers();
|
||||||
builder.Services.AddApiUser();
|
builder.Services.AddApiUser();
|
||||||
|
|
||||||
builder.Services.AddRabbitMqHandler(rabbitConfiguration);
|
builder.Services.AddMassTransit(x =>
|
||||||
|
{
|
||||||
|
x.AddConsumer<FanoutEventConsumer>();
|
||||||
|
x.AddConsumer<TenantUpdatedEventConsumer>();
|
||||||
|
|
||||||
|
x.UsingRabbitMq((context, cfg) =>
|
||||||
|
{
|
||||||
|
cfg.ConfigureNewtonsoftJsonSerializer(settings =>
|
||||||
|
{
|
||||||
|
settings.TypeNameHandling = TypeNameHandling.Objects;
|
||||||
|
settings.NullValueHandling = NullValueHandling.Ignore;
|
||||||
|
return settings;
|
||||||
|
});
|
||||||
|
cfg.UseNewtonsoftJsonSerializer();
|
||||||
|
cfg.ConfigureNewtonsoftJsonDeserializer(settings =>
|
||||||
|
{
|
||||||
|
settings.TypeNameHandling = TypeNameHandling.Objects;
|
||||||
|
settings.NullValueHandling = NullValueHandling.Ignore;
|
||||||
|
return settings;
|
||||||
|
});
|
||||||
|
|
||||||
|
cfg.Host(rabbitConfiguration.Host, (ushort)rabbitConfiguration.Port, "/", h => {
|
||||||
|
h.Username(rabbitConfiguration.Username);
|
||||||
|
h.Password(rabbitConfiguration.Password);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Every instance of the service will receive the message
|
||||||
|
cfg.ReceiveEndpoint("clean-architecture-fanout-event-" + Guid.NewGuid(), e =>
|
||||||
|
{
|
||||||
|
e.Durable = false;
|
||||||
|
e.AutoDelete = true;
|
||||||
|
e.ConfigureConsumer<FanoutEventConsumer>(context);
|
||||||
|
e.DiscardSkippedMessages();
|
||||||
|
});
|
||||||
|
cfg.ReceiveEndpoint("clean-architecture-fanout-events", e =>
|
||||||
|
{
|
||||||
|
e.ConfigureConsumer<TenantUpdatedEventConsumer>(context);
|
||||||
|
e.DiscardSkippedMessages();
|
||||||
|
});
|
||||||
|
cfg.ConfigureEndpoints(context);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
builder.Services.AddHostedService<SetInactiveUsersService>();
|
builder.Services.AddHostedService<SetInactiveUsersService>();
|
||||||
|
|
||||||
@ -148,7 +191,7 @@ app.MapControllers();
|
|||||||
app.MapGrpcService<UsersApiImplementation>();
|
app.MapGrpcService<UsersApiImplementation>();
|
||||||
app.MapGrpcService<TenantsApiImplementation>();
|
app.MapGrpcService<TenantsApiImplementation>();
|
||||||
|
|
||||||
app.Run();
|
await app.RunAsync();
|
||||||
|
|
||||||
// Needed for integration tests web application factory
|
// Needed for integration tests web application factory
|
||||||
public partial class Program
|
public partial class Program
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.1.0" />
|
<PackageReference Include="Aspire.Hosting.AppHost" Version="9.1.0" />
|
||||||
<PackageReference Include="Aspire.Hosting.RabbitMQ" Version="9.1.0" />
|
<PackageReference Include="Aspire.Hosting.RabbitMQ" Version="9.1.0" />
|
||||||
<PackageReference Include="Aspire.Hosting.Redis" Version="9.1.0" />
|
<PackageReference Include="Aspire.Hosting.Redis" Version="9.1.0" />
|
||||||
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.0.0" />
|
<PackageReference Include="Aspire.Hosting.SqlServer" Version="9.1.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@ -8,11 +8,11 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
|
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
|
||||||
<PackageReference Include="FluentValidation" Version="11.11.0" />
|
<PackageReference Include="FluentValidation" Version="11.11.0" />
|
||||||
|
<PackageReference Include="MassTransit" Version="8.3.7" />
|
||||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.2" />
|
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.3" />
|
||||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||||
<PackageReference Include="RabbitMQ.Client" Version="7.1.1" />
|
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.1" />
|
||||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.6.0" />
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@ -1,6 +0,0 @@
|
|||||||
namespace CleanArchitecture.Domain.Constants;
|
|
||||||
|
|
||||||
public sealed class Messaging
|
|
||||||
{
|
|
||||||
public const string ExchangeNameNotifications = "exchange-notifications";
|
|
||||||
}
|
|
22
CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
Normal file
22
CleanArchitecture.Domain/Consumers/FanoutEventConsumer.cs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
using System.Threading.Tasks;
|
||||||
|
using CleanArchitecture.Shared.Events;
|
||||||
|
using MassTransit;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace CleanArchitecture.Domain.Consumers;
|
||||||
|
|
||||||
|
public sealed class FanoutEventConsumer : IConsumer<FanoutDomainEvent>
|
||||||
|
{
|
||||||
|
private readonly ILogger<FanoutEventConsumer> _logger;
|
||||||
|
|
||||||
|
public FanoutEventConsumer(ILogger<FanoutEventConsumer> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Consume(ConsumeContext<FanoutDomainEvent> context)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("FanoutDomainEventConsumer: {FanoutDomainEvent}", context.Message);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
using System.Threading.Tasks;
|
||||||
|
using CleanArchitecture.Shared.Events.Tenant;
|
||||||
|
using MassTransit;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace CleanArchitecture.Domain.Consumers;
|
||||||
|
|
||||||
|
public sealed class TenantUpdatedEventConsumer : IConsumer<TenantUpdatedEvent>
|
||||||
|
{
|
||||||
|
private readonly ILogger<TenantUpdatedEventConsumer> _logger;
|
||||||
|
|
||||||
|
public TenantUpdatedEventConsumer(ILogger<TenantUpdatedEventConsumer> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Consume(ConsumeContext<TenantUpdatedEvent> context)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("TenantUpdatedEventConsumer: {TenantId}", context.Message.AggregateId);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
@ -1,27 +1,33 @@
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using CleanArchitecture.Domain.Constants;
|
using CleanArchitecture.Domain.Interfaces;
|
||||||
using CleanArchitecture.Domain.Rabbitmq;
|
|
||||||
using CleanArchitecture.Shared.Events;
|
using CleanArchitecture.Shared.Events;
|
||||||
|
using MassTransit;
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.EventHandler.Fanout;
|
namespace CleanArchitecture.Domain.EventHandler.Fanout;
|
||||||
|
|
||||||
public sealed class FanoutEventHandler : IFanoutEventHandler
|
public sealed class FanoutEventHandler : IFanoutEventHandler
|
||||||
{
|
{
|
||||||
private readonly RabbitMqHandler _rabbitMqHandler;
|
private readonly IPublishEndpoint _massTransit;
|
||||||
|
private readonly IUser _user;
|
||||||
|
|
||||||
public FanoutEventHandler(
|
public FanoutEventHandler(
|
||||||
RabbitMqHandler rabbitMqHandler)
|
IPublishEndpoint massTransit, IUser user)
|
||||||
{
|
{
|
||||||
_rabbitMqHandler = rabbitMqHandler;
|
_massTransit = massTransit;
|
||||||
_rabbitMqHandler.InitializeExchange(Messaging.ExchangeNameNotifications);
|
_user = user;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event)
|
public async Task<T> HandleDomainEventAsync<T>(T @event) where T : DomainEvent
|
||||||
{
|
{
|
||||||
_rabbitMqHandler.EnqueueExchangeMessage(
|
var fanoutDomainEvent =
|
||||||
Messaging.ExchangeNameNotifications,
|
new FanoutDomainEvent(
|
||||||
@event);
|
@event.AggregateId,
|
||||||
|
@event,
|
||||||
|
_user.GetUserId());
|
||||||
|
|
||||||
|
await _massTransit.Publish(fanoutDomainEvent);
|
||||||
|
await _massTransit.Publish(@event);
|
||||||
|
|
||||||
return Task.FromResult(@event);
|
return @event;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -5,5 +5,5 @@ namespace CleanArchitecture.Domain.EventHandler.Fanout;
|
|||||||
|
|
||||||
public interface IFanoutEventHandler
|
public interface IFanoutEventHandler
|
||||||
{
|
{
|
||||||
Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event);
|
Task<T> HandleDomainEventAsync<T>(T @event) where T : DomainEvent;
|
||||||
}
|
}
|
@ -1,23 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class BindQueueToExchange : IRabbitMqAction
|
|
||||||
{
|
|
||||||
private readonly string _exchangeName;
|
|
||||||
private readonly string _queueName;
|
|
||||||
private readonly string _routingKey;
|
|
||||||
|
|
||||||
public BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
|
|
||||||
{
|
|
||||||
_exchangeName = exchangeName;
|
|
||||||
_routingKey = routingKey;
|
|
||||||
_queueName = queueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
await channel.QueueBindAsync(_queueName, _exchangeName, _routingKey);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,21 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class CreateExchange : IRabbitMqAction
|
|
||||||
{
|
|
||||||
private readonly string _name;
|
|
||||||
private readonly string _type;
|
|
||||||
|
|
||||||
public CreateExchange(string name, string type)
|
|
||||||
{
|
|
||||||
_name = name;
|
|
||||||
_type = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
await channel.ExchangeDeclareAsync(_name, _type);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class CreateQueue : IRabbitMqAction
|
|
||||||
{
|
|
||||||
public string QueueName { get; }
|
|
||||||
|
|
||||||
public CreateQueue(string queueName)
|
|
||||||
{
|
|
||||||
QueueName = queueName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
await channel.QueueDeclareAsync(
|
|
||||||
QueueName,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
null);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,9 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public interface IRabbitMqAction
|
|
||||||
{
|
|
||||||
Task Perform(IChannel channel);
|
|
||||||
}
|
|
@ -1,33 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class RegisterConsumer : IRabbitMqAction
|
|
||||||
{
|
|
||||||
private readonly Func<string, string, string, ConsumeEventHandler, Task> _addConsumer;
|
|
||||||
private readonly ConsumeEventHandler _consumer;
|
|
||||||
private readonly string _exchange;
|
|
||||||
private readonly string _queue;
|
|
||||||
private readonly string _routingKey;
|
|
||||||
|
|
||||||
public RegisterConsumer(
|
|
||||||
string exchange,
|
|
||||||
string queue,
|
|
||||||
string routingKey,
|
|
||||||
ConsumeEventHandler consumer,
|
|
||||||
Func<string, string, string, ConsumeEventHandler, Task> addConsumer)
|
|
||||||
{
|
|
||||||
_exchange = exchange;
|
|
||||||
_queue = queue;
|
|
||||||
_routingKey = routingKey;
|
|
||||||
_consumer = consumer;
|
|
||||||
_addConsumer = addConsumer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
await _addConsumer(_exchange, _queue, _routingKey, _consumer);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,19 +0,0 @@
|
|||||||
using System.Threading.Tasks;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class SendAcknowledgement : IRabbitMqAction
|
|
||||||
{
|
|
||||||
public ulong DeliveryTag { get; }
|
|
||||||
|
|
||||||
public SendAcknowledgement(ulong deliveryTag)
|
|
||||||
{
|
|
||||||
DeliveryTag = deliveryTag;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
await channel.BasicAckAsync(DeliveryTag, false);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,38 +0,0 @@
|
|||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Newtonsoft.Json;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
|
|
||||||
public sealed class SendMessage : IRabbitMqAction
|
|
||||||
{
|
|
||||||
private static readonly JsonSerializerSettings s_serializerSettings =
|
|
||||||
new() { TypeNameHandling = TypeNameHandling.Objects };
|
|
||||||
|
|
||||||
private readonly string _exchange;
|
|
||||||
private readonly object _message;
|
|
||||||
|
|
||||||
private readonly string _routingKey;
|
|
||||||
|
|
||||||
|
|
||||||
/// <param name="routingKey">If exchange is empty, this is the name of the queue</param>
|
|
||||||
public SendMessage(string routingKey, string exchange, object message)
|
|
||||||
{
|
|
||||||
_routingKey = routingKey;
|
|
||||||
_exchange = exchange;
|
|
||||||
_message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task Perform(IChannel channel)
|
|
||||||
{
|
|
||||||
var json = JsonConvert.SerializeObject(_message, s_serializerSettings);
|
|
||||||
|
|
||||||
var content = Encoding.UTF8.GetBytes(json);
|
|
||||||
|
|
||||||
await channel.BasicPublishAsync(
|
|
||||||
_exchange,
|
|
||||||
_routingKey,
|
|
||||||
content);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
|
||||||
|
|
||||||
public delegate Task<bool> ConsumeEventHandler(ReadOnlyMemory<byte> content);
|
|
@ -1,18 +0,0 @@
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq.Extensions;
|
|
||||||
|
|
||||||
public static class ServiceCollectionExtensions
|
|
||||||
{
|
|
||||||
public static IServiceCollection AddRabbitMqHandler(
|
|
||||||
this IServiceCollection services,
|
|
||||||
RabbitMqConfiguration configuration)
|
|
||||||
{
|
|
||||||
services.AddSingleton(configuration);
|
|
||||||
|
|
||||||
services.AddSingleton<RabbitMqHandler>();
|
|
||||||
services.AddHostedService(serviceProvider => serviceProvider.GetService<RabbitMqHandler>()!);
|
|
||||||
|
|
||||||
return services;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,240 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using CleanArchitecture.Domain.Rabbitmq.Actions;
|
|
||||||
using Microsoft.Extensions.Hosting;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using RabbitMQ.Client;
|
|
||||||
using RabbitMQ.Client.Events;
|
|
||||||
|
|
||||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
|
||||||
|
|
||||||
public sealed class RabbitMqHandler : BackgroundService
|
|
||||||
{
|
|
||||||
private readonly RabbitMqConfiguration _configuration;
|
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<string, List<ConsumeEventHandler>> _consumers = new();
|
|
||||||
|
|
||||||
private readonly ILogger<RabbitMqHandler> _logger;
|
|
||||||
|
|
||||||
private readonly ConcurrentQueue<IRabbitMqAction> _pendingActions = new();
|
|
||||||
private IChannel? _channel;
|
|
||||||
|
|
||||||
public RabbitMqHandler(
|
|
||||||
RabbitMqConfiguration configuration,
|
|
||||||
ILogger<RabbitMqHandler> logger)
|
|
||||||
{
|
|
||||||
_configuration = configuration;
|
|
||||||
_logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Connection will not be established");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_logger.LogInformation("Starting RabbitMQ connection");
|
|
||||||
|
|
||||||
var factory = new ConnectionFactory
|
|
||||||
{
|
|
||||||
AutomaticRecoveryEnabled = true,
|
|
||||||
HostName = _configuration.Host,
|
|
||||||
Port = _configuration.Port,
|
|
||||||
UserName = _configuration.Username,
|
|
||||||
Password = _configuration.Password
|
|
||||||
};
|
|
||||||
|
|
||||||
var connection = await factory.CreateConnectionAsync(cancellationToken);
|
|
||||||
_channel = await connection.CreateChannelAsync(null, cancellationToken);
|
|
||||||
|
|
||||||
await base.StartAsync(cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.",
|
|
||||||
exchangeName);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(new CreateExchange(exchangeName, type));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void InitializeQueues(params string[] queueNames)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the creation of queues.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var queue in queueNames)
|
|
||||||
{
|
|
||||||
_pendingActions.Enqueue(new CreateQueue(queue));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void BindQueueToExchange(string queueName, string exchangeName, string routingKey = "")
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the binding of queue to exchange.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(new BindQueueToExchange(queueName, exchangeName, routingKey));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void AddConsumer(string queueName, ConsumeEventHandler consumer)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of consumer.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// routingKey is set to queueName to mimic rabbitMQ
|
|
||||||
_pendingActions.Enqueue(
|
|
||||||
new RegisterConsumer(
|
|
||||||
string.Empty,
|
|
||||||
queueName,
|
|
||||||
queueName,
|
|
||||||
consumer,
|
|
||||||
AddEventConsumer));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void AddExchangeConsumer(string exchange, string routingKey, string queue, ConsumeEventHandler consumer)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping the addition of exchange consumer.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(
|
|
||||||
new RegisterConsumer(
|
|
||||||
exchange,
|
|
||||||
queue,
|
|
||||||
routingKey,
|
|
||||||
consumer,
|
|
||||||
AddEventConsumer));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void AddExchangeConsumer(string exchange, string queue, ConsumeEventHandler consumer)
|
|
||||||
{
|
|
||||||
AddExchangeConsumer(exchange, string.Empty, queue, consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task AddEventConsumer(string exchange, string queueName, string routingKey,
|
|
||||||
ConsumeEventHandler consumer)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Event consumer will not be added.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var key = $"{exchange}-{routingKey}";
|
|
||||||
|
|
||||||
if (!_consumers.TryGetValue(key, out var consumers))
|
|
||||||
{
|
|
||||||
consumers = new List<ConsumeEventHandler>();
|
|
||||||
_consumers.TryAdd(key, consumers);
|
|
||||||
|
|
||||||
var eventHandler = new AsyncEventingBasicConsumer(_channel!);
|
|
||||||
eventHandler.ReceivedAsync += CallEventConsumersAsync;
|
|
||||||
|
|
||||||
await _channel!.BasicConsumeAsync(queueName, false, eventHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
consumers.Add(consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task CallEventConsumersAsync(object sender, BasicDeliverEventArgs ea)
|
|
||||||
{
|
|
||||||
var key = $"{ea.Exchange}-{ea.RoutingKey}";
|
|
||||||
|
|
||||||
if (!_consumers.TryGetValue(key, out var consumers))
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var consumer in consumers)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await consumer(ea.Body);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error while handling event in queue {RoutingKey}", ea.RoutingKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(new SendAcknowledgement(ea.DeliveryTag));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void EnqueueMessage(string queueName, object message)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(new SendMessage(queueName, string.Empty, message));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void EnqueueExchangeMessage(string exchange, object message, string routingKey = "")
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Skipping enqueueing of message");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
_pendingActions.Enqueue(new SendMessage(routingKey, exchange, message));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
||||||
{
|
|
||||||
if (!_configuration.Enabled)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("RabbitMQ is disabled. Message handling loop will not be started");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
await HandleEnqueuedActions();
|
|
||||||
|
|
||||||
await Task.Delay(1000, stoppingToken);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task HandleEnqueuedActions()
|
|
||||||
{
|
|
||||||
while (_pendingActions.TryDequeue(out var action))
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
await action.Perform(_channel!);
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error while trying to send a rabbitmq message");
|
|
||||||
_pendingActions.Enqueue(action);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +1,9 @@
|
|||||||
namespace CleanArchitecture.Domain.Rabbitmq;
|
namespace CleanArchitecture.Domain.Settings;
|
||||||
|
|
||||||
public sealed class RabbitMqConfiguration
|
public sealed class RabbitMqConfiguration
|
||||||
{
|
{
|
||||||
public string Host { get; set; } = string.Empty;
|
public string Host { get; set; } = string.Empty;
|
||||||
public int Port { get; set; }
|
public int Port { get; set; }
|
||||||
public bool Enabled { get; set; }
|
|
||||||
public string Username { get; set; } = string.Empty;
|
public string Username { get; set; } = string.Empty;
|
||||||
public string Password { get; set; } = string.Empty;
|
public string Password { get; set; } = string.Empty;
|
||||||
|
|
@ -12,10 +12,10 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.2">
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="9.0.3">
|
||||||
<PrivateAssets>all</PrivateAssets>
|
<PrivateAssets>all</PrivateAssets>
|
||||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||||
</PackageReference>
|
</PackageReference>
|
||||||
|
@ -13,8 +13,8 @@
|
|||||||
<PrivateAssets>all</PrivateAssets>
|
<PrivateAssets>all</PrivateAssets>
|
||||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||||
</PackageReference>
|
</PackageReference>
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.2" />
|
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.2" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="9.0.3" />
|
||||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
|
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
|
||||||
<PackageReference Include="NUnit" Version="4.3.2" />
|
<PackageReference Include="NUnit" Version="4.3.2" />
|
||||||
<PackageReference Include="NUnit.Analyzers" Version="4.6.0">
|
<PackageReference Include="NUnit.Analyzers" Version="4.6.0">
|
||||||
|
@ -66,7 +66,7 @@ internal class GlobalSetupFixture
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Creation of the respawner can fail if the database has not been created yet
|
// Creation of the respawner can fail if the database has not been created yet
|
||||||
TestContext.WriteLine($"Failed to create respawner: {ex.Message}");
|
await TestContext.Out.WriteLineAsync($"Failed to create respawner: {ex.Message}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,9 +13,9 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Google.Protobuf" Version="3.29.3" />
|
<PackageReference Include="Google.Protobuf" Version="3.30.1" />
|
||||||
<PackageReference Include="Google.Protobuf.Tools" Version="3.29.3" />
|
<PackageReference Include="Google.Protobuf.Tools" Version="3.30.1" />
|
||||||
<PackageReference Include="Grpc.AspNetCore" Version="2.67.0" />
|
<PackageReference Include="Grpc.AspNetCore" Version="2.70.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -10,15 +10,15 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
<FrameworkReference Include="Microsoft.AspNetCore.App" />
|
||||||
|
|
||||||
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.2.0" />
|
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
|
||||||
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
|
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="9.1.0" />
|
||||||
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.1" />
|
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.11.2" />
|
||||||
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.1" />
|
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.11.2" />
|
||||||
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.0" />
|
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.11.1" />
|
||||||
<PackageReference Include="OpenTelemetry.Instrumentation.EntityFrameworkCore" Version="1.0.0-beta.12" />
|
<PackageReference Include="OpenTelemetry.Instrumentation.EntityFrameworkCore" Version="1.0.0-beta.12" />
|
||||||
<PackageReference Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.9.0-beta.1" />
|
<PackageReference Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.9.0-beta.1" />
|
||||||
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.0" />
|
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.11.1" />
|
||||||
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.0" />
|
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.11.1" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<PackageReference Include="MassTransit" Version="8.3.7" />
|
||||||
<PackageReference Include="MediatR" Version="12.4.1" />
|
<PackageReference Include="MediatR" Version="12.4.1" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using MassTransit;
|
||||||
using MediatR;
|
using MediatR;
|
||||||
|
|
||||||
namespace CleanArchitecture.Shared.Events;
|
namespace CleanArchitecture.Shared.Events;
|
||||||
|
|
||||||
|
[ExcludeFromTopology]
|
||||||
public abstract class DomainEvent : Message, INotification
|
public abstract class DomainEvent : Message, INotification
|
||||||
{
|
{
|
||||||
public DateTime Timestamp { get; private set; }
|
public DateTime Timestamp { get; private set; }
|
||||||
|
18
CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
Normal file
18
CleanArchitecture.Shared/Events/FanoutDomainEvent.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace CleanArchitecture.Shared.Events;
|
||||||
|
|
||||||
|
public class FanoutDomainEvent : DomainEvent
|
||||||
|
{
|
||||||
|
public DomainEvent DomainEvent { get; }
|
||||||
|
public Guid? UserId { get; }
|
||||||
|
|
||||||
|
public FanoutDomainEvent(
|
||||||
|
Guid aggregateId,
|
||||||
|
DomainEvent domainEvent,
|
||||||
|
Guid? userId) : base(aggregateId)
|
||||||
|
{
|
||||||
|
DomainEvent = domainEvent;
|
||||||
|
UserId = userId;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user