摘要:这将同时安装EasyNetQ和其依赖的RabbitMQ.Client库。建议使用DI安装,EasyNetQ.DI.Microsof包含EasyNetQ,同时依赖Newtonsoft.Json
EasyNetQ是一个为.NET环境设计的RabbitMQ客户端API,旨在简化与RabbitMQ的交互。
关于RabbitMq的更多知识点在:
你可以通过NuGet包管理器来安装EasyNetQ。在Package Manager Console中运行以下命令:
PM> Install-Package EasyNetQ这将同时安装EasyNetQ和其依赖的RabbitMQ.Client库。建议使用DI安装,EasyNetQ.DI.Microsof包含EasyNetQ,同时依赖Newtonsoft.Json
varconnectionString = "host=111.111.11.111;virtualHost=/;username=admin;password=123456;timeout=60";
//链接注册
builder.Services.RegisterEasyNetQ("host=8.153.70.182;virtualHost=/;username=zhaoke;password=123123;publisherConfirms=true");
//发布注册
builder.Services.AddTransient;
//订阅注册
builder.Services.AddTransient;
//添加消息处理
builder.Services.AddHostedService;EasyNetQ支持发布/订阅模式,你可以通过创建一个.NET类来定义消息,然后使用Publish方法发布消息。例如:publicclassTextMessage
{
publicstringText {get;set; }
}
bus.Publish(newTextMessage { Text = "Hello World" });
EasyNetQ会根据消息类型自动创建交换机和队列,并使用Newtonsoft.Json序列化消息为JSON格式。
MQPublish的封装using EasyNetQ.Topology;using EasyNetQ;
///
/// 发布消息
///
public class MQPublish
{
private readonly IBus bus;
public MQPublish(IBus bus)
{
this.bus = bus;
}
///
/// 发布消息
///
///
///
public async Task PublishMessageAsync(string routingKey, object data)
{
Console.WriteLine($"MQ消息推送,routingKey :{routingKey} , 推送数据 :{System.Text.Json.JsonSerializer.Serialize(data)}");
var message = new Message(data);
var advancedBus = bus.Advanced;
advancedBus.QueueDeclare(routingKey);
await advancedBus.PublishAsync(Exchange.Default, routingKey, false, message);
}
}
订阅消息时,你需要指定一个订阅ID和一个处理消息的委托。
bus.Subscribe("subscriptionId", message =>{
Console.WriteLine("Received message: " + message.Text);
});
当有消息发布到对应的交换机和队列时,你的订阅就会收到消息。
封装MQSubscribepublic class MQSubscribe{
// MQ消息总线
private readonly IBus bus;
public MQSubscribe(IBus bus)
{
this.bus = bus;
}
///
/// 处理消息的总入口
///
///
public Task Init
{
SubscribeTSysLogVis;
//程序不结束,等待输入
Console.WriteLine($"已启动(处理消息) {DateTime.UtcNow}");
return Task.CompletedTask;
}
private Task SubscribeTSysLogVis
{
var advancedBus = bus.Advanced;
//订阅TSysLogVis日志 - 请不要在两次发布之间重复使用它
var queue = advancedBus.QueueDeclare("TSysLogVis");
advancedBus.Consume(queue, async (body, properties, info) =>
{
try
{
var message = Encoding.UTF8.GetString(body.ToArray);
//var data = JsonConvert.DeserializeObject(message);
Console.WriteLine($"消息处理 {DateTime.Now} : {message}");
//db.Insertable(data).SplitTable.ExecuteReturnSnowflakeId;
}
catch (Exception ex)
{
// 处理异常,例如记录日志或重新抛出
Console.Error.WriteLine($"处理消息时发生异常: {ex}");
}
});
}
}
启用订阅服务即可
publicclassSubscribeWorker:BackgroundService{
privatereadonlyMQSubscribe _Service;
publicSubscribeWorker(MQSubscribe Service)
{
_Service = Service;
}
// 执行逻辑
protectedoverrideasyncTaskExecuteAsync(CancellationToken stoppingToken)
{
await_Service.Init;
}
}
来源:opendotnet
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!