阿里面试题:RocketMQ 保障消息不丢与不被重复消费的实现方式

360影视 2024-12-16 19:01 4

摘要:首先给大家简单介绍一下RocketMQ。RocketMQ 是一个分布式的消息队列系统,作为阿里巴巴开源的消息队列中间件,它有着非常高的可靠性和性能表现。无论是事务消息、延时消息,还是消息顺序消费,RocketMQ 都能轻松应对。

今天咱们聊点有意思的东西——RocketMQ,关于它是如何保证消息不丢失,如何避免消息被重复消费的。

我们时常得面对消息队列的各种“挑战”。特别是消息丢失和重复消费这两大难题,想必每个做过消息队列相关开发的朋友都有一肚子苦水要吐。

放心,今天我们就来好好掰扯掰扯这些问题。

首先给大家简单介绍一下RocketMQ。RocketMQ 是一个分布式的消息队列系统,作为阿里巴巴开源的消息队列中间件,它有着非常高的可靠性和性能表现。无论是事务消息、延时消息,还是消息顺序消费,RocketMQ 都能轻松应对。

但是,说到它如何保证消息不丢失,以及如何防止重复消费的问题,咱们就得更深入地了解它的机制了。别急,慢慢往下看,咱们一步步解析。

在分布式系统中,消息丢失是一个非常致命的问题。为了确保消息的可靠投递,RocketMQ 提供了几种机制来保证消息不丢失。

首先,最重要的一点是消息存储的持久化。RocketMQ 使用了基于文件的存储方式,所有消息都会先被存储到磁盘上,而不是仅仅存储在内存中。这意味着即使消息消费者异常退出,或者 RocketMQ 服务重启,只要磁盘上的数据没有丢失,消息就依然能够恢复。

// 这是RocketMQ的消息存储模式,存储在磁盘上DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");producer.setNamesrvAddr("localhost:9876");producer.start;Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes);SendResult sendResult = producer.send(msg);

在上述代码中,生产者将消息发送到 RocketMQ 的服务器中。生产者会将消息写入到磁盘的消息存储文件中,这样就保证了即使消息被发送后,RocketMQ 服务宕机,消息依然可以被恢复。

RocketMQ 提供了 同步发送异步发送单向发送 的方式来投递消息。在同步发送模式下,RocketMQ 会在消息投递到 Broker 后,等待 Broker 给出一个确认响应(即 ACK),确认消息已经成功接收并存储在磁盘上。如果 Broker 没有给出确认,消息就会被重新投递。这样就避免了消息丢失的情况。

// 同步发送方式,确保消息已经被 Broker 确认SendResult sendResult = producer.send(msg);if (sendResult.getSendStatus != SendStatus.SEND_OK) { // 处理发送失败的情况,可以做重试 System.out.println("消息发送失败,进行重试...");}

为了防止磁盘损坏导致的消息丢失,RocketMQ 采用了 副本机制。每个主题的消息会在多个 Broker 之间进行副本同步。默认情况下,RocketMQ 会为每个消息维护多个副本(一般是2个),保证一个副本丢失时,其他副本可以恢复消息。

// Broker 的副本机制,确保消息不丢失Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes);SendResult sendResult = producer.send(msg);

通过副本机制,即使一个节点宕机,其他节点上仍然有备份,可以恢复消息,避免丢失。

消息重复消费是另一个常见的问题,通常会出现在以下几种场景:

消息被消费者多次消费消息被重新投递到消费者(网络异常等原因)

为了保证每条消息只被消费一次,RocketMQ 采用了以下几种方法:

RocketMQ 消息的每一条都有一个 全局唯一的消息 ID。这个消息 ID 是生产者生成的,消费者可以通过它来判断是否已经消费过该条消息。

// 消息ID,用于避免重复消费Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes);msg.setKeys("msgId_12345"); // 自定义消息ID,确保唯一SendResult sendResult = producer.send(msg);

消费者每次消费完一条消息,都会记录消费进度(通常是消息的偏移量),存储在 RocketMQ 的 消费进度存储 中。消费者恢复时,会从上次消费的进度继续消费,避免重复消费。

// 消费者提交消费进度MessageListenerConcurrently messageListener = (messageExtList, context) -> { for (MessageExt messageExt : messageExtList) { System.out.println("消费消息: " + new String(messageExt.getBody)); // 提交消费进度 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER;};

消费者消费完每条消息后,会通过 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 来标记该消息已成功消费,并提交消费进度。这样即使在消费者故障的情况下,也能避免重复消费。

在实际开发中,确保消费者端具有幂等性非常重要。即使同一条消息被消费者多次消费,最终的效果也应该是一样的。你可以通过在消费者端记录消费过的消息 ID 或其他标识来实现幂等性。例如,如果在数据库中插入数据时,采用唯一索引来避免重复插入。

// 在数据库中插入数据时,通过唯一索引保证幂等性public void consumeMessage(Message message) { String messageId = message.getKeys; // 获取消息ID if (!messageAlreadyConsumed(messageId)) { // 消费消息,插入数据库 insertData(message); } else { // 已经消费过该消息,跳过 System.out.println("消息已被消费,跳过"); }}四、总结一下消息持久化存储:通过磁盘存储,消息即使在系统崩溃后也能恢复。消息确认机制:保证消息已经成功存储,减少了丢失的风险。副本机制:多个副本保证消息的高可用性。幂等性设计:通过消费者端的幂等性处理,即使重复消费,也能保证业务的正确性。

至于如何让你的消息系统更稳健,大家可以根据自己的业务需求适当调节 RocketMQ 的配置,比如消费模式、消息重试机制等。

来源:麻辣小王子

相关推荐