dotNet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期

360影视 2024-12-05 08:31 4

摘要:在 rabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicP

在 rabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。

下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期

首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:

Install-Package RabbitMQ.Client在你的文件中,添加RabbitMQ的连接配置:public class RabbitMQOptions
{
public string HostName { get; set; }
public int Port { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}在Startup.cs或程序启动时的配置方法中,注册RabbitMQ服务:// 绑定RabbitMQ配置
builder.Services.Configure(builder.Configuration.GetSection("RabbitMQ"));

// 注册RabbitMQ连接工厂
builder.Services.AddSingleton(sp =>
{
var options = sp.GetRequiredService>.Value;
var factory = new ConnectionFactory { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };
return new RabbitMQConnection(factory);
});

// 添加RabbitMQService的服务注册
builder.Services.AddSingleton; public interface IRabbitMQConnection : IDisposable
{
Task CreateChannel;
}

public class RabbitMQConnection : IRabbitMQConnection
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private bool _isDisposed;

public RabbitMQConnection(ConnectionFactory factory)
{
_factory = factory ?? throw new ArgumentException(nameof(factory));
_connection = factory.CreateConnectionAsync.Result;
}

public async Task CreateChannel
{
EnsureNotDisposed;
return await _connection.CreateChannelAsync;
}

public void Dispose
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (_isDisposed) return;

if (disposing)
{
// Free any other managed objects here.
}

// Free any unmanaged objects here.
_connection.Dispose;

_isDisposed = true;
}

~RabbitMQConnection
{
Dispose(false);
}

private void EnsureNotDisposed
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(RabbitMQConnection));
}
}
}在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;

public class RabbitMQService
{
private readonly IRabbitMQConnection _connection;

public RabbitMQService(IRabbitMQConnection connection)
{
_connection = connection ?? throw new ArgumentException(nameof(connection));
}

public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
{

try
{
using var channel = _connection.CreateChannel;
var mesjson = JsonSerializer.Serialize(message);
Console.WriteLine("发送消息:"+ mesjson);
var body = Encoding.UTF8.Getbytes(mesjson);
var properties = new RabbitMQ.Client.BasicProperties
{
Persistent = true // 设置消息持久化
};
channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);

}
catch (OperationCanceledException ex)
{
Console.WriteLine($"Operation was canceled: {ex.Message}");
//throw; // Re-throw if you want to propagate the cancellation
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
//throw; // Re-throw if you want to propagate the error
}
}

public async Task ReceiveAsync(string queueName, Func callback, CancellationToken cancellationToken = default)
{
var channel = _connection.CreateChannel;
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: );

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray;
try
{
// 直接传递 model 和 body 给 callback,不需要转换
await callback(channel, body);
}

finally
{
//await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
};
await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
// Prevent the method from returning immediately
await Task.Delay(-1, cancellationToken);
}
}消费var app = builder.Build;

var rabbitMQService = app.Services.GetRequiredService;
var cancellationTokenSource = new CancellationTokenSource;
var cancellationToken = cancellationTokenSource.Token;

// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
// 处理接收到的消息
//string message = Encoding.UTF8.GetString(body);
//Console.WriteLine($"收到消息 message: {message}");
//// 确认消息
//await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);

}, cancellationToken);生产端

来源:opendotnet

相关推荐