diff --git a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs index 1d70e99..3595632 100644 --- a/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs +++ b/CleanArchitecture.Domain/Rabbitmq/RabbitMqHandler.cs @@ -21,6 +21,7 @@ public sealed class RabbitMqHandler : BackgroundService private readonly ConcurrentQueue _pendingActions = new(); private IChannel? _channel; + private IConnection? _connection; public RabbitMqHandler( RabbitMqConfiguration configuration, @@ -49,18 +50,35 @@ public sealed class RabbitMqHandler : BackgroundService Password = _configuration.Password }; - var connection = await factory.CreateConnectionAsync(cancellationToken); - _channel = await connection.CreateChannelAsync(null, cancellationToken); + _connection = await factory.CreateConnectionAsync(cancellationToken); + _channel = await _connection.CreateChannelAsync(null, cancellationToken); await base.StartAsync(cancellationToken); } + public override async Task StopAsync(CancellationToken cancellationToken) + { + if (_channel is not null) + { + await _channel.CloseAsync(cancellationToken); + await _channel.DisposeAsync(); + } + + if (_connection is not null) + { + await _connection.CloseAsync(cancellationToken); + await _connection.DisposeAsync(); + } + + await base.StopAsync(cancellationToken); + } + public void InitializeExchange(string exchangeName, string type = ExchangeType.Fanout) { if (!_configuration.Enabled) { - _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {exchangeName}.", + _logger.LogInformation("RabbitMQ is disabled. Skipping the creation of exchange {ExchangeName}.", exchangeName); return; } @@ -214,21 +232,50 @@ public sealed class RabbitMqHandler : BackgroundService return; } - while (true) + while (!stoppingToken.IsCancellationRequested) { - await HandleEnqueuedActions(); + var channel = await GetOpenChannelAsync(stoppingToken); + + if (channel is not null) + { + await HandleEnqueuedActionsAsync(channel); + } await Task.Delay(1000, stoppingToken); } } - private async Task HandleEnqueuedActions() + private async ValueTask GetOpenChannelAsync(CancellationToken stoppingToken) + { + var channel = _channel; + + if (_connection is null || channel is null || channel.IsClosed) + { + return null; + } + + try + { + channel = await _connection.CreateChannelAsync(null, stoppingToken); + _channel = channel; + + return channel; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error while trying to create a new channel"); + } + + return null; + } + + private async Task HandleEnqueuedActionsAsync(IChannel channel) { while (_pendingActions.TryDequeue(out var action)) { try { - await action.Perform(_channel!); + await action.Perform(channel); } catch (Exception ex) {