0
0
mirror of https://github.com/alex289/CleanArchitecture.git synced 2025-08-23 19:58:34 +00:00

Add fanout event handler

This commit is contained in:
alex289 2023-09-02 11:14:01 +02:00
parent 4746bfbef1
commit bff6fb47f2
No known key found for this signature in database
GPG Key ID: 573F77CD2D87F863
7 changed files with 61 additions and 4 deletions

View File

@ -0,0 +1,6 @@
namespace CleanArchitecture.Domain.Constants;
public sealed class Messaging
{
public const string ExchangeNameNotifications = "exchange-notifications";
}

View File

@ -3,6 +3,7 @@ using MediatR;
namespace CleanArchitecture.Domain.DomainEvents; namespace CleanArchitecture.Domain.DomainEvents;
// Todo: Move this and all events to shared
public abstract class DomainEvent : Message, INotification public abstract class DomainEvent : Message, INotification
{ {
public DateTime Timestamp { get; private set; } public DateTime Timestamp { get; private set; }

View File

@ -0,0 +1,26 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.Constants;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.Rabbitmq;
namespace CleanArchitecture.Domain.EventHandler.Fanout;
public sealed class FanoutEventHandler : IFanoutEventHandler
{
private readonly RabbitMqHandler _rabbitMqHandler;
public FanoutEventHandler(
RabbitMqHandler rabbitMqHandler)
{
_rabbitMqHandler = rabbitMqHandler;
}
public Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event)
{
_rabbitMqHandler.EnqueueExchangeMessage(
Messaging.ExchangeNameNotifications,
@event);
return Task.FromResult(@event);
}
}

View File

@ -0,0 +1,9 @@
using System.Threading.Tasks;
using CleanArchitecture.Domain.DomainEvents;
namespace CleanArchitecture.Domain.EventHandler.Fanout;
public interface IFanoutEventHandler
{
Task<DomainEvent> HandleDomainEventAsync(DomainEvent @event);
}

View File

@ -6,7 +6,9 @@ using CleanArchitecture.Domain.Commands.Users.CreateUser;
using CleanArchitecture.Domain.Commands.Users.DeleteUser; using CleanArchitecture.Domain.Commands.Users.DeleteUser;
using CleanArchitecture.Domain.Commands.Users.LoginUser; using CleanArchitecture.Domain.Commands.Users.LoginUser;
using CleanArchitecture.Domain.Commands.Users.UpdateUser; using CleanArchitecture.Domain.Commands.Users.UpdateUser;
using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.EventHandler; using CleanArchitecture.Domain.EventHandler;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Events.Tenant; using CleanArchitecture.Domain.Events.Tenant;
using CleanArchitecture.Domain.Events.User; using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Interfaces; using CleanArchitecture.Domain.Interfaces;
@ -36,6 +38,9 @@ public static class ServiceCollectionExtension
public static IServiceCollection AddNotificationHandlers(this IServiceCollection services) public static IServiceCollection AddNotificationHandlers(this IServiceCollection services)
{ {
// Fanout
services.AddScoped<IFanoutEventHandler, FanoutEventHandler>();
// User // User
services.AddScoped<INotificationHandler<UserCreatedEvent>, UserEventHandler>(); services.AddScoped<INotificationHandler<UserCreatedEvent>, UserEventHandler>();
services.AddScoped<INotificationHandler<UserUpdatedEvent>, UserEventHandler>(); services.AddScoped<INotificationHandler<UserUpdatedEvent>, UserEventHandler>();

View File

@ -2,6 +2,7 @@ using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands.Users.DeleteUser; using CleanArchitecture.Domain.Commands.Users.DeleteUser;
using CleanArchitecture.Domain.DomainEvents; using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Events.User; using CleanArchitecture.Domain.Events.User;
using CleanArchitecture.Domain.Notifications; using CleanArchitecture.Domain.Notifications;
using MediatR; using MediatR;
@ -17,8 +18,9 @@ public sealed class InMemoryBusTests
{ {
var mediator = Substitute.For<IMediator>(); var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>(); var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore); var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
const string key = "Key"; const string key = "Key";
const string value = "Value"; const string value = "Value";
@ -36,8 +38,9 @@ public sealed class InMemoryBusTests
{ {
var mediator = Substitute.For<IMediator>(); var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>(); var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore); var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
var userDeletedEvent = new UserDeletedEvent(Guid.NewGuid(), Guid.NewGuid()); var userDeletedEvent = new UserDeletedEvent(Guid.NewGuid(), Guid.NewGuid());
@ -51,8 +54,9 @@ public sealed class InMemoryBusTests
{ {
var mediator = Substitute.For<IMediator>(); var mediator = Substitute.For<IMediator>();
var domainEventStore = Substitute.For<IDomainEventStore>(); var domainEventStore = Substitute.For<IDomainEventStore>();
var fanoutEventHandler = Substitute.For<IFanoutEventHandler>();
var inMemoryBus = new InMemoryBus(mediator, domainEventStore); var inMemoryBus = new InMemoryBus(mediator, domainEventStore, fanoutEventHandler);
var deleteUserCommand = new DeleteUserCommand(Guid.NewGuid()); var deleteUserCommand = new DeleteUserCommand(Guid.NewGuid());

View File

@ -1,6 +1,7 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using CleanArchitecture.Domain.Commands; using CleanArchitecture.Domain.Commands;
using CleanArchitecture.Domain.DomainEvents; using CleanArchitecture.Domain.DomainEvents;
using CleanArchitecture.Domain.EventHandler.Fanout;
using CleanArchitecture.Domain.Interfaces; using CleanArchitecture.Domain.Interfaces;
using MediatR; using MediatR;
@ -10,13 +11,16 @@ public sealed class InMemoryBus : IMediatorHandler
{ {
private readonly IDomainEventStore _domainEventStore; private readonly IDomainEventStore _domainEventStore;
private readonly IMediator _mediator; private readonly IMediator _mediator;
private readonly IFanoutEventHandler _fanoutEventHandler;
public InMemoryBus( public InMemoryBus(
IMediator mediator, IMediator mediator,
IDomainEventStore domainEventStore) IDomainEventStore domainEventStore,
IFanoutEventHandler fanoutEventHandler)
{ {
_mediator = mediator; _mediator = mediator;
_domainEventStore = domainEventStore; _domainEventStore = domainEventStore;
_fanoutEventHandler = fanoutEventHandler;
} }
public Task<TResponse> QueryAsync<TResponse>(IRequest<TResponse> query) public Task<TResponse> QueryAsync<TResponse>(IRequest<TResponse> query)
@ -29,6 +33,8 @@ public sealed class InMemoryBus : IMediatorHandler
await _domainEventStore.SaveAsync(@event); await _domainEventStore.SaveAsync(@event);
await _mediator.Publish(@event); await _mediator.Publish(@event);
await _fanoutEventHandler.HandleDomainEventAsync(@event);
} }
public Task SendCommandAsync<T>(T command) where T : CommandBase public Task SendCommandAsync<T>(T command) where T : CommandBase