IIWAB redis pub/sub 消息兜底 - IIWAB

redis pub/sub 消息兜底

IIWAB 17天前 ⋅ 36 阅读

如何在消费端因 Full GC 恢复后,正确调用 recoverLostMessages 来补全漏收的消息,同时避免重复消费。

一、recoverLostMessages 的正确调用时机

recoverLostMessages 不是随便调用的,需要在消费端启动时检测到 GC 停顿恢复后触发,确保漏收的消息被补全。

1. 核心调用时机

  • 消费端启动时:每次消费进程启动(包括 GC 导致的停顿恢复后),先执行一次兜底消息补全;
  • 定时检测:每隔一段时间(如 1 分钟)执行一次,兜底偶发的漏消息(可选)。

2. 完整代码示例(含调用逻辑 + 防重复消费)

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class MessageConsumer {
    // 注入 RedisTemplate(假设你已配置好)
    private final RedisTemplate<String, String> redisTemplate;

    // 消费频道名称
    private static final String CHANNEL = "order_notify";
    // 兜底消息的 Redis Key
    private static final String BACKUP_KEY = "msg_backup:" + CHANNEL;
    // 记录已处理消息的 Redis Key(防重复)
    private static final String PROCESSED_MSG_KEY = "processed_msgs:" + CHANNEL;

    // 定时任务线程池(用于定时检测漏消息)
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public MessageConsumer(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    // ========== 1. 启动时自动执行:补全漏消息 ==========
    @PostConstruct
    public void init() {
        // 启动时先补全漏消息
        recoverLostMessages();
        // 定时(每1分钟)检测并补全漏消息,兜底极端情况
        scheduler.scheduleAtFixedRate(this::recoverLostMessages, 1, 1, TimeUnit.MINUTES);
        // 启动正常的 Pub/Sub 消费逻辑
        startNormalConsume();
    }

    // ========== 2. 核心方法:补全漏消息 + 防重复消费 ==========
    public void recoverLostMessages() {
        // 步骤1:使用 Redis 原子操作,一次性获取并删除兜底消息(避免多实例重复读取)
        // Lua 脚本:先获取 List 所有元素,再删除 Key,保证原子性
        String luaScript = """
                local msgs = redis.call('LRANGE', KEYS[1], 0, -1)
                redis.call('DEL', KEYS[1])
                return msgs
                """;
        DefaultRedisScript<List<String>> script = new DefaultRedisScript<>();
        script.setScriptText(luaScript);
        script.setResultType(List.class);

        // 执行 Lua 脚本,原子获取并清空兜底消息
        List<String> lostMsgs = redisTemplate.execute(script, Collections.singletonList(BACKUP_KEY));

        // 步骤2:处理漏消息,同时防重复
        if (lostMsgs != null && !lostMsgs.isEmpty()) {
            for (String msg : lostMsgs) {
                // 先检查消息是否已处理,未处理才消费
                if (!isMessageProcessed(msg)) {
                    processMessage(msg);
                    // 标记消息已处理(设置过期时间,避免 Redis 内存膨胀)
                    markMessageProcessed(msg);
                }
            }
        }
    }

    // ========== 3. 防重复消费:检查消息是否已处理 ==========
    private boolean isMessageProcessed(String msg) {
        // 用消息内容生成唯一标识(如果消息有唯一 ID,直接用 ID 更高效)
        String msgId = generateMsgId(msg);
        return redisTemplate.hasKey(PROCESSED_MSG_KEY + ":" + msgId);
    }

    // ========== 4. 防重复消费:标记消息已处理 ==========
    private void markMessageProcessed(String msg) {
        String msgId = generateMsgId(msg);
        // 设置 24 小时过期,避免 Redis 存储大量无用数据
        redisTemplate.opsForValue().set(PROCESSED_MSG_KEY + ":" + msgId, "1", 24, TimeUnit.HOURS);
    }

    // ========== 5. 生成消息唯一标识(核心:确保重复消息的 ID 一致) ==========
    private String generateMsgId(String msg) {
        // 方案1:如果消息本身有唯一 ID(如 orderId),直接提取(推荐)
        // 示例:msg 格式为 "orderId:1001,status:paid",提取 orderId 作为唯一标识
        if (msg.contains("orderId:")) {
            String[] parts = msg.split(",");
            for (String part : parts) {
                if (part.startsWith("orderId:")) {
                    return part.split(":")[1];
                }
            }
        }
        // 方案2:无唯一 ID 时,对消息内容做哈希(避免重复)
        return UUID.nameUUIDFromBytes(msg.getBytes()).toString();
    }

    // ========== 6. 正常的 Pub/Sub 消费逻辑 ==========
    private void startNormalConsume() {
        redisTemplate.convertAndSend(CHANNEL, "test"); // 测试发送
        // 订阅消息
        redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {
            String msg = new String(message.getBody());
            // 正常消费时也要先检查是否已处理(避免兜底和正常消费重复)
            if (!isMessageProcessed(msg)) {
                processMessage(msg);
                markMessageProcessed(msg);
            }
        }, CHANNEL.getBytes());
    }

    // ========== 7. 实际的业务消息处理逻辑 ==========
    private void processMessage(String msg) {
        // 你的业务逻辑:如更新订单状态、发送通知等
        System.out.println("处理消息:" + msg);
    }

    // ========== 8. 关闭资源 ==========
    public void destroy() {
        scheduler.shutdown();
    }
}

二、关键逻辑解释(解决你的核心问题)

1. recoverLostMessages 的调用时机

  • 启动时调用:通过 @PostConstruct 注解,消费端 Bean 初始化时自动执行,覆盖进程重启/GC 恢复后的漏消息;
  • 定时调用:通过 ScheduledExecutorService 每 1 分钟执行一次,兜底偶发的漏消息(如 GC 停顿期间兜底 List 未被及时处理)。

2. 规避重复消费的核心策略

重复消费的根源是:兜底消息和正常 Pub/Sub 消息可能被多次读取,因此需要全局唯一的消息标识 + 已处理消息记录

  • 原子读取兜底消息:使用 Lua 脚本一次性获取并删除兜底 List 的所有消息,避免多实例同时读取导致重复;
  • 消息唯一标识
    • 优先用消息自带的唯一 ID(如订单号、消息 ID),这是最高效的方式;
    • 无唯一 ID 时,对消息内容做哈希(如 UUID.nameUUIDFromBytes),确保相同消息的 ID 一致;
  • 已处理消息缓存:将已处理的消息 ID 存入 Redis(设置 24 小时过期),消费前先检查,已处理则跳过。

3. 额外优化:正常消费时也防重复

即使是正常的 Pub/Sub 消费,也可能因网络重试、进程重启导致重复接收,因此在 startNormalConsume 中,消费消息前同样要检查 isMessageProcessed,确保全链路防重复。

三、补充说明(适配不同场景)

  1. 如果消息体较大

    • 兜底存储时,不要存完整消息,只存消息唯一 ID,完整消息存在 Redis Hash 中(Key:msg_content:{msgId}),减少内存占用;
    • 示例:发布者存储 msg_backup:order_notify(List 存 msgId) + msg_content:1001(Hash 存完整消息)。
  2. 多实例部署时

    • 兜底 List 的原子读取(Lua 脚本)已保证多实例不会重复读取,无需额外锁;
    • 已处理消息的 Redis Key 是全局的,多实例共享,确保重复消息被拦截。
  3. GC 停顿检测(进阶)

    • 若要精准检测 GC 停顿并触发补全,可通过 JMX 监听 GC 事件,GC 结束后立即调用 recoverLostMessages
      // 监听 Full GC 事件
      ManagementFactory.getGarbageCollectorMXBeans().forEach(bean -> {
          bean.addNotificationListener((notification, handback) -> {
              String gcName = notification.getType();
              if (gcName.contains("FullGC") || gcName.contains("G1 Full GC")) {
                  // Full GC 后立即补全消息
                  recoverLostMessages();
              }
          }, null, null);
      });
      

总结

  1. recoverLostMessages 需在消费端启动时定时/GC 检测后调用,覆盖所有可能漏消息的场景;
  2. 规避重复消费的核心是:原子读取兜底消息 + 全局唯一消息 ID + 已处理消息缓存
  3. 该方案是 Redis Pub/Sub 的兜底优化,无法 100% 保证无漏消息,若需绝对可靠,仍需替换为 Redis Stream/Kafka。

全部评论: 0

    我有话说: