SpringBoot+Redis自定义注解实现发布订阅

摘要:如何在 SpringBoot 项目中通过自定义注解实现 Redis 的发布订阅模式。你也许会问,为什么不直接用 Spring Data Redis 自带的功能?嗯,答案是:虽然 Spring 提供了很多便利的功能,但有时候业务需求总是会让我们想要做些“定制化”

如何在 SpringBoot 项目中通过自定义注解实现 Redis 的发布订阅模式。你也许会问,为什么不直接用 Spring Data Redis 自带的功能?嗯,答案是:虽然 Spring 提供了很多便利的功能,但有时候业务需求总是会让我们想要做些“定制化”操作,对吧?

我们今天要做的,就是通过自定义注解和动态代理,来实现一种灵活、可配置的消息流转方式。

最近,我们团队开发了一个内部消息组件,主要是做消息的生产和消费,其中包括 Redis 发布订阅模式。为了解耦和提高可配置性,我们决定用自定义注解 @MessageHub 来管理消息消费。

通过这种方式,我们不需要手动去配置复杂的 Redis 监听器,而是让注解来帮我们自动化这一切。总之,目标就是让这件事情变得更加简洁和灵活!

在没有注解和动态代理的情况下,Redis 的发布订阅模式通常是这么实现的:

**创建消息监听器 MessageListenerAdapter**:这是 Spring 中用于处理 Redis 消息的标准方式,基本上就是将一个消息处理方法适配成 Redis 可以识别的监听器。**创建订阅器 MessageListener**:订阅器会监听 Redis 频道上的消息,接收到消息后调用 onMessage 方法进行处理。**配置 RedisMessageListenerContainer**:通过这个容器来管理所有的消息监听器,并且把它们绑定到 Redis 上。

这种方式看似没啥问题,但缺点就是不够灵活。每次我们都需要在代码里手动配置监听器,所有的逻辑都写死了,无法做到动态配置,也就没法灵活地根据业务需求来调整。

对于我们的项目来说,理想的方案是能够动态地注册和管理 Redis 的发布订阅监听器,而不是一开始就写死监听的逻辑。我们希望能够用注解来标注需要消费消息的方法,然后通过 Spring 的一些特性,动态地将这些方法绑定到 Redis 消息监听器上。这样,既能够减少配置的工作,又能提升系统的灵活性。

那么,如何做到这一点呢?这就是我们今天要解决的问题。

首先来看看我们消息组件的设计。我们的消息组件其实有两个核心方法:生产消息和消费消息。

生产消息:这个方法负责将消息发送到 Redis 频道,使用 StringRedisTemplate.convertAndSend 实现消息的生产。生产者就像是发送者,负责把消息推送到 Redis 上。消费消息:这个方法是比较复杂的,因为我们需要创建一个常驻的消费线程来处理从 Redis 中接收到的消息。我们必须考虑如何动态地注册这些消费线程,并确保能够回调到具体的方法。

现在,我们来讨论如何通过自定义注解来实现动态的消息消费。核心思路很简单:

用 @MessageHub 注解来标记消费消息的方法。Spring 启动时,扫描这些注解并生成相应的 Redis 消息监听器。使用动态代理,将这些方法回调和消息处理结合起来。扫描 Bean 和方法:我们使用 BeanPostProcessor#postProcessAfterInitialization 来扫描 Spring 容器中的所有 Bean,寻找那些标注了 @MessageHub 注解的方法。动态构建监听器:根据方法信息,动态创建一个 MessageListenerAdapter,并将它注册到 RedisMessageListenerContainer 中。回调方法设置:为了确保监听器能正常工作,我们需要手动调用 afterPropertiesSet 方法来初始化监听器的属性,确保消息能正确被消费。

接下来,我们来实现一个 Redis 发布订阅的处理器 RedisPubSubProcessor。这个处理器主要负责消息的生产和消费。

生产消息:通过 stringRedisTemplate.convertAndSend 发送消息到指定的 Redis 频道。消费消息:首先,动态创建 MessageListenerAdapter,然后将其注册到 RedisMessageListenerContainer 中。每当消息被发布到频道时,MessageListenerAdapter 就会触发回调方法,执行我们定义好的消费逻辑。// 动态创建消息监听器MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(delegateObject, "processMessage");RedisMessageListenerContainer container = new RedisMessageListenerContainer;container.addMessageListener(listenerAdapter, new ChannelTopic("myTopic"));

这段代码其实就是我们如何将消息处理方法和 Redis 的订阅机制绑定在一起。通过 MessageListenerAdapter,我们可以让 Redis 消息直接触发到我们定义的方法上。

MessageListenerAdapter 需要两个参数,一个是 delegate 对象,另一个是方法名。这个构造器让我们能够指定哪个方法来处理收到的消息。

// 构造一个消息监听器,指定处理方法MessageListenerAdapter adapter = new MessageListenerAdapter(myBean, "handleMessage");

RedisMessageListenerContainer 是 Spring 提供的一个容器,用于管理 Redis 消息的监听和分发。它需要我们将监听器和 Redis 频道绑定。

// 配置 Redis 消息监听容器RedisMessageListenerContainer container = new RedisMessageListenerContainer;container.addMessageListener(listenerAdapter, new ChannelTopic("myChannel"));生命周期管理

为了避免空指针错误,我们需要确保监听器的属性在容器初始化时被正确设置。这时,afterPropertiesSet 方法非常有用,它能保证监听器在实际使用之前就已经完成初始化。

// 手动初始化监听器的相关属性listenerAdapter.afterPropertiesSet;

通过自定义注解和动态代理,我们实现了一个更加灵活的 Redis 发布订阅模式。这种方式不仅使得代码更加简洁,也能根据不同的需求动态注册和管理消息的消费方法。更重要的是,这个方案不仅可以适配 Redis,还可以扩展到其他消息中间件,例如 Kafka 等。

未来,随着需求的变化,我们还可以进一步优化这个方案,支持更多的消息传输协议和方式。

来源:麻辣小王子

相关推荐