【橙子老哥】C# RabbitMQ-RPC/发布确认/延迟队列(二)

摘要:GRPC,这rabbitmq怎么也和这玩意儿挂上关系了?其实我们把这个单词叫全,大家就清楚了(Remote Procedure Call),是一种计算机通信协议,允许程序在不同的地址空间(通常是不同的计算机)上执行代码有人疑惑了,在web程序里面,我们http

hello,大家好,欢迎来到橙子老哥的分享时刻,希望大家一起学习,一起进步。

欢迎加入.net意社区,第一时间了解我们的动态,文章第一时间分享至社区

添加是的,没有错,意社区正式上线了小程序移动版本哦~欢迎大家前来体验 另外,社区商城开放,超多纪念品送到家

也支持pc:#小程序://意Net/yKytCpkgoMN1rUH

今天,我们来继续上一期的

我们在上一期,实操了前5个模式,因为篇幅有限,以及前5种模式用的比较多,还剩下rpc和发布者确认留在下一期讲

简单模式(Simple)

工作队列模式(Work Queue)

发布订阅模式(Publish/Subscribe)

路由模式(Routing)

通配符模式(Topics)

Rpc模式(RPC)

发布确认模式(Publish Confirms)

废话少说,直接来吧!

GRPC,这rabbitmq怎么也和这玩意儿挂上关系了?其实我们把这个单词叫全,大家就清楚了(Remote Procedure Call),是一种计算机通信协议,允许程序在不同的地址空间(通常是不同的计算机)上执行代码有人疑惑了,在web程序里面,我们http接口调用,不都是在远程过程调用吗?其实这类似异步和多线程的区别,不是一个维度的东西

Http是一个通讯协议

RPC是一种通讯模式

举个最明显的区别,大家熟悉的gRpc还是还是基于Http2.0呢

这里说的是通过Rabbitmq实现RPC模式,是基于Rabbitmq,而不是Http

如果客户端要去调用服务端的一个方法,我们来看看Rabbitmq是怎么做的

上图看起来东西很多,其实不然,我们把上图灰色的先去掉,客户端发送请求,resquest请求丢到一个队列,服务端完成之后再把response响应丢到另一个队列,客户端去接收,不就完成了?灰色的方块只是对消息的标记,我们给每个消息顶一个(reply_to)服务端回复队列和(correlation_id)追踪id,客户端发送消息到rpc_queue中,服务端通过解析reply_to把消息转到对应的队列中,客户端再通过订阅这个订阅,根据correlation_id解析对应的数据即可了

我们直接上代码:客户端:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Collections.Concurrent;
using System.Text;

public class Rpc
{
//我们的客户端
public static async Task Main(string args)
{
Console.WriteLine("RPC Client");
string n = args.Length > 0 ? args[0] : "30";

//这里去调用服务端方法
await InvokeAsync(n);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine;
}

private static async Task InvokeAsync(string n)
{
//我们封装一个rpc操作类
var rpcClient = new RpcClient;
//初始化一下
await rpcClient.StartAsync;

Console.WriteLine(" [x] Requesting fib({0})", n);
//发送请求,接收response
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
//打印结果,是不是很简单了,
}
}

//因为客户端调用,每次都要写很多重复的代码,特别是对异步操作,所以这里客户端要封装了一个类
public class RpcClient : IAsyncDisposable
{
private const string QUEUE_NAME = "rpc_queue";

private readonly IConnectionFactory _connectionFactory;

//其他没啥,这里注意客户端放一个TaskCompletionSource的安全字典
//TaskCompletionSource是c#中异步的类,用于可以手动操作异步的结果
private readonly ConcurrentDictionary> _callbackMapper
= new;

private IConnection? _connection;
private IChannel? _channel;
private string? _replyQueueName;

//初始化一下
public RpcClient
{
_connectionFactory = new ConnectionFactory { HostName = "localhost" };
}

//准备调用,进行声明,这里将服务端要返回的响应队列,进行订阅
public async Task StartAsync
{
//创建连接
_connection = await _connectionFactory.CreateConnectionAsync;
//创建信道
_channel = await _connection.CreateChannelAsync;

// 声明一个队列,并且将队列名进行保存,方便下面call方法直接调用,,一个客户端一个
QueueDeclareOk queueDeclareResult = await _channel.QueueDeclareAsync;
_replyQueueName = queueDeclareResult.QueueName;
var consumer = new AsyncEventingbasicConsumer(_channel);

//订阅消息
consumer.ReceivedAsync += (model, ea) =>
{
//这里我们可以从消息中的BasicProperties,获取到CorrelationId 追踪id
string? correlationId = ea.BasicProperties.CorrelationId;

if (false == string.IsOrEmpty(correlationId))
{
//如果消息响应到了,这将存储的TaskCompletionSource设置一个数据和状态,并在集合中移除即可
if (_callbackMapper.TryRemove(correlationId, out var tcs))
{
var body = ea.Body.ToArray;
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
}
}

return Task.CompletedTask;
};
//订阅
await _channel.BasicConsumeAsync(_replyQueueName, true, consumer);
}

//客户端真正去调用服务端方法
public async Task CallAsync(string message,
CancellationToken cancellationToken = default)
{
if (_channel is )
{
throw new InvalidOperationException;
}

//创建一个追踪id
string correlationId = Guid.NewGuid.ToString;

//这里我们发送消息的时候,可以给消息加个基础信息,就是CorrelationId和ReplyTo
var props = new BasicProperties
{
CorrelationId = correlationId,
ReplyTo = _replyQueueName
};

//创建TaskCompletionSource,结果是string
var tcs = new TaskCompletionSource(
TaskCreationOptions.RunContinuationsAsynchronously);
//加入集合,需要start的消息消费
_callbackMapper.TryAdd(correlationId, tcs);

//发送消息,这里的routingKey写死,并且传入basicProperties
var messagebytes = Encoding.UTF8.GetBytes(message);
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: QUEUE_NAME,
mandatory: true, basicProperties: props, body: messageBytes);

//通过CancellationTokenRegistration进行管理cancellationToken的注销
//CancellationTokenRegistration这个也是c#中异步的类,用于管理cancellationToken
using CancellationTokenRegistration ctr =
cancellationToken.Register( =>
{
_callbackMapper.TryRemove(correlationId, out _);
tcs.SetCanceled;
});

//对TaskCompletionSource进行异步等待,返回结果即可
return await tcs.Task;
}

public async ValueTask DisposeAsync
{
if (_channel is not )
{
await _channel.CloseAsync;
}

if (_connection is not )
{
await _connection.CloseAsync;
}
}
}

服务端:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

const string QUEUE_NAME = "rpc_queue";

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync;
using var channel = await connection.CreateChannelAsync;

//服务端订阅订阅,就是客户端发送的那个队列QUEUE_NAME
await channel.QueueDeclareAsync(queue: QUEUE_NAME, durable: false, exclusive: false,
autoDelete: false, arguments: );

//消费者设置,当有1个未确认的消息,不允许再接收,设置仅影响当前通道上的消费者,而不是全局设置
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) =>
{
AsyncEventingBasicConsumer cons = (AsyncEventingBasicConsumer)sender;
IChannel ch = cons.Channel;
string response = string.Empty;

byte body = ea.Body.ToArray;
//这里,我们从刚刚客户端消息的BasicProperties,拿出CorrelationId

//将CorrelationId进行一个透传
IReadOnlyBasicProperties props = ea.BasicProperties;
var replyProps = new BasicProperties
{
CorrelationId = props.CorrelationId
};

try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine($" [.] Fib({message})");

//运行服务端的方法
response = Fib(n).ToString;
}
catch (Exception e)
{
Console.WriteLine($" [.] {e.Message}");
response = string.Empty;
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);

//服务端客户端给的CorrelationId进行区分,找到对应的回复id要回复到哪个队列上,props.ReplyTo
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: props.ReplyTo!,
mandatory: true, basicProperties: replyProps, body: responseBytes);

//消息确认
await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
}
};

await channel.BasicConsumeAsync(QUEUE_NAME, false, consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine;

//这段方法,是服务端的,客户端要远程调用这个方法
static int Fib(int n)
{
if (n is 0 or 1)
{
return n;
}

return Fib(n - 1) + Fib(n - 2);
}
而开启发布者确认模式后,RabbitMQ 会在成功接收消息后,向发布者发送确认,确保消息已被正确接收或者说,不知道啊,为了提高生产者发布的消息到队列中的正确率

当然,通常这种情况非常的极端

RabbitMQ 的发布者确认模式(Publisher Confirms)主要用于解决发布消息(message)到队列(queue)的可靠传输问题。当你启用发布者确认模式时,发布者会等待消息被RabbitMQ接收,或者通知发布者消息未能成功接收

发布者确认是AMQP 0.9.1 协议的 RabbitMQ 扩展, 因此,默认情况下不会启用它们。

我们只需要在生产者创建连接的时候,进行声明下即可

var channelOpts = new CreateChannelOptions(
//设置开启发布者消息确认
publisherConfirmationsEnabled: true,
//开启发布者确认追踪
publisherConfirmationTrackingEnabled: true,
//限流
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

这里确认方式,又有很多了,官方提供了3种,性能是从低到高,但是对应的实现难度也是指数倍增加

单个确认

批量确认

异步确认

生产者代码实现:

using System.Buffers.Binary;
using System.Diagnostics;
using System.Text;
using RabbitMQ.Client;

const ushort MAX_OUTSTANDING_CONFIRMS = 256;

const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions(
//设置开启发布者消息确认
publisherConfirmationsEnabled: true,
//开启发布者确认追踪
publisherConfirmationTrackingEnabled: true,
//限流
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
);

var props = new BasicProperties
{
Persistent = true
};

string hostname = "localhost";
if (args.Length > 0)
{
if (false == string.IsOrWhiteSpace(args[0]))
{
hostname = args[0];
}
}

//单个确认
await PublishMessagesIndividuallyAsync;

//批量确认
await PublishMessagesInBatchAsync;

//异步确认
await HandlePublishConfirmsAsynchronously;

Task CreateConnectionAsync
{
var factory = new ConnectionFactory { HostName = hostname };
return factory.CreateConnectionAsync;
}

//单个单个确认
//这种技术非常简单,但也有一个主要缺点: 它会显著减慢发布速度,因为消息的确认会阻止发布 的所有后续消息。这种方法不会提供 每秒发布几百条消息。不过,这可能是 对于某些应用程序来说已经足够好了。
async Task PublishMessagesIndividuallyAsync
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");

await using IConnection connection = await CreateConnectionAsync;
//将定义的channelOpts配置信道
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// 声明队列
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync;
string queueName = queueDeclareResult.QueueName;

var sw = new Stopwatch;
sw.Start;

for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte body = Encoding.UTF8.GetBytes(i.ToString);
try
{
//发送5w条持久消息,设置消息mandatory失败告诉生产者
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, basicProperties: props, mandatory: true);
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}");
}
}

sw.Stop;

Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
}

//批量确认
//一个缺点是,在发生故障时,我们不知道到底出了什么问题
async Task PublishMessagesInBatchAsync
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

int batchSize = MAX_OUTSTANDING_CONFIRMS / 2;
int outstandingMessageCount = 0;

sw.Start;

//将生产消息装到这里
var publishTasks = new List;

{

//这里注意,并没一个一个去执行,而是将一批的消息异步执行
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
outstandingMessageCount++;

//当循环到需要确认的值的时候(现在设置的是一半)
if (outstandingMessageCount == batchSize)
{
foreach (ValueTask pt in publishTasks)
{
try
{
//等到批量的确认结果
await pt;
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
}
}
publishTasks.Clear;
outstandingMessageCount = 0;
}
}

//再做一次将消息清空
if (publishTasks.Count > 0)
{

{
try
{
await pt;
}
catch (Exception ex)
{

}
}
publishTasks.Clear;

}

sw.Stop;
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
}

//异步确认
//这里性能最佳,带代码也是最繁琐的
async Task HandlePublishConfirmsAsynchronously
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");

channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);

// declare a server-named queue

var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var outstandingConfirms = new LinkedList;
var semaphore = new SemaphoreSlim(1, 1);
int confirmedCount = 0;
async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
if (debug)
{
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
DateTime.Now, deliveryTag, multiple);
}

await semaphore.WaitAsync;
try
{
if (multiple)
{
do
{
LinkedListNode? node = outstandingConfirms.First;
if (node is )
{
break;
}
if (node.Value <= deliveryTag)
{
outstandingConfirms.RemoveFirst;
}
else
{
break;
}

confirmedCount++;
} while (true);
}
else
{
confirmedCount++;
outstandingConfirms.Remove(deliveryTag);
}
}
finally
{
semaphore.Release;
}

if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
{
allMessagesConfirmedTcs.SetResult(true);
}
}

channel.BasicReturnAsync += (sender, ea) =>
{
ulong sequenceNumber = 0;

IReadOnlyBasicProperties props = ea.BasicProperties;
if (props.Headers is not )
{
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
if (maybeSeqNum is not )
{
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte)maybeSeqNum);
}
}

Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
return CleanOutstandingConfirms(sequenceNumber, false);
};

channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacksAsync += (sender, ea) =>
{
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
return CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

sw.Start;

var publishTasks = new List>;

{
string msg = i.ToString;
byte body = Encoding.UTF8.GetBytes(msg);
ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync;
if ((ulong)(i + 1) != nextPublishSeqNo)
{
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
}

try
{
outstandingConfirms.AddLast(nextPublishSeqNo);
}
finally
{
semaphore.Release;
}

string rk = queueName;
if (i % 1000 == 0)
{
// This will cause a basic.return, for fun
rk = Guid.NewGuid.ToString;
}
(ulong, ValueTask) data =
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
publishTasks.Add(data);
}

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
{
try
{
await datum.PublishTask;
}
catch (Exception ex)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'");
}
}

try
{
await allMessagesConfirmedTcs.Task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
Console.Error.WriteLine("{0} [ERROR] all messages could not be published and confirmed within 10 seconds", DateTime.Now);
}
catch (TimeoutException)
{

}

sw.Stop;
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {sw.ElapsedMilliseconds:N0} ms");
}
4、延迟队列

最后,就是rabbitmq中说烂了的延迟队列,主要用于需要延迟执行的消息,例如培训机构最喜欢的电商系统中,取消订单的操作 rabbitmq要实现这个,有两种方式

安装插件rabbitmq_delayed_message_exchange

使用TTL(Time-To-Live)和队列的死信交换机(Dead Letter Exchange)来实现

4.1 使用插件:在申明交换机的时候,选择x-delayed-message类型即可await channel.ExchangeDeclareAsync("my-exchange", "x-delayed-message",durable:true,autoDelete:false,
new Dictionary
{
{"x-delayed-type", "direct"}
});
4.2 TTL和DLXTTL:说白了,就是可以设置消息的过期时间,如果超出时间没有消费,就是DLX:当一个队列中的消息成为死信后,如果这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机。通过这种方式,可以将各种原因导致的失败消息集中到一个特定的地方,以便于分析和处理。

这里大家可能有个误区,认为死信交换机是一种特殊的交换机,其实并不是,它之所以叫做死信交换机,是因为有其他队列把死信消息绑定给了它

消息先到了TTL队列,等5秒过期之后,就到了死信队列,消费者即时消费这个死信队列即可

请注意,声明的这个TTL严禁有任何的消费者,或者TTL都过期不了直接被消费了

生产者:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
const string QueueName = "normal_queue";
const string DlxExchangeName = "dlx_exchange";
const string DlxQueueName = "delay_queue";

static async Task Main(string args)
{
var factory = new ConnectionFactory { HostName = "xxx",Port = 5672,Password = "xxx",UserName = "xxx"};
await using var connection = await factory.CreateConnectionAsync;
await using var channel =await connection.CreateChannelAsync;

// 声明死信交换机,其实和正常交换机没有任何区别
await channel.ExchangeDeclareAsync(DlxExchangeName, ExchangeType.Fanout,durable:true,autoDelete:false);

// 声明死信队列,其实和正常交换机没有任何区别
await channel.QueueDeclareAsync(DlxQueueName,exclusive:false, durable: true, autoDelete: false);
await channel.QueueBindAsync(DlxQueueName, DlxExchangeName,string.Empty);

// 声明TTL队列,和死信队列绑定
var arguments = new Dictionary
{
{"x-dead-letter-exchange",DlxExchangeName},
{"x-message-ttl", 5000 } // 设置 TTL为5000ms (5秒)
};
await channel.QueueDeclareAsync(QueueName, durable: true,exclusive:false, autoDelete: false,arguments:arguments);

// 发送消息到延迟队列
var message = "这是一个延迟消息";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: string.Empty,routingKey:QueueName, body);
Console.WriteLine(" [x] Sent {0}", message);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine;
}
}
消费者直接正常消费死信队列即可

Rabbitmq 只有这些内容吗?当然不止!各各组件特性搭配,根据业务,玩出花都可以

最后再提一嘴,望大家来刚上线的意社区小程序看上几眼,不会让你失望

.Net意社区,高频发布原创有深度的.Net相关知识内容

与你一起学习,一起进步

来源:opendotnet

相关推荐