如何在消费端因 Full GC 恢复后,正确调用 recoverLostMessages 来补全漏收的消息,同时避免重复消费。
一、recoverLostMessages 的正确调用时机
recoverLostMessages 不是随便调用的,需要在消费端启动时和检测到 GC 停顿恢复后触发,确保漏收的消息被补全。
1. 核心调用时机
- 消费端启动时:每次消费进程启动(包括 GC 导致的停顿恢复后),先执行一次兜底消息补全;
- 定时检测:每隔一段时间(如 1 分钟)执行一次,兜底偶发的漏消息(可选)。
2. 完整代码示例(含调用逻辑 + 防重复消费)
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class MessageConsumer {
// 注入 RedisTemplate(假设你已配置好)
private final RedisTemplate<String, String> redisTemplate;
// 消费频道名称
private static final String CHANNEL = "order_notify";
// 兜底消息的 Redis Key
private static final String BACKUP_KEY = "msg_backup:" + CHANNEL;
// 记录已处理消息的 Redis Key(防重复)
private static final String PROCESSED_MSG_KEY = "processed_msgs:" + CHANNEL;
// 定时任务线程池(用于定时检测漏消息)
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public MessageConsumer(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
// ========== 1. 启动时自动执行:补全漏消息 ==========
@PostConstruct
public void init() {
// 启动时先补全漏消息
recoverLostMessages();
// 定时(每1分钟)检测并补全漏消息,兜底极端情况
scheduler.scheduleAtFixedRate(this::recoverLostMessages, 1, 1, TimeUnit.MINUTES);
// 启动正常的 Pub/Sub 消费逻辑
startNormalConsume();
}
// ========== 2. 核心方法:补全漏消息 + 防重复消费 ==========
public void recoverLostMessages() {
// 步骤1:使用 Redis 原子操作,一次性获取并删除兜底消息(避免多实例重复读取)
// Lua 脚本:先获取 List 所有元素,再删除 Key,保证原子性
String luaScript = """
local msgs = redis.call('LRANGE', KEYS[1], 0, -1)
redis.call('DEL', KEYS[1])
return msgs
""";
DefaultRedisScript<List<String>> script = new DefaultRedisScript<>();
script.setScriptText(luaScript);
script.setResultType(List.class);
// 执行 Lua 脚本,原子获取并清空兜底消息
List<String> lostMsgs = redisTemplate.execute(script, Collections.singletonList(BACKUP_KEY));
// 步骤2:处理漏消息,同时防重复
if (lostMsgs != null && !lostMsgs.isEmpty()) {
for (String msg : lostMsgs) {
// 先检查消息是否已处理,未处理才消费
if (!isMessageProcessed(msg)) {
processMessage(msg);
// 标记消息已处理(设置过期时间,避免 Redis 内存膨胀)
markMessageProcessed(msg);
}
}
}
}
// ========== 3. 防重复消费:检查消息是否已处理 ==========
private boolean isMessageProcessed(String msg) {
// 用消息内容生成唯一标识(如果消息有唯一 ID,直接用 ID 更高效)
String msgId = generateMsgId(msg);
return redisTemplate.hasKey(PROCESSED_MSG_KEY + ":" + msgId);
}
// ========== 4. 防重复消费:标记消息已处理 ==========
private void markMessageProcessed(String msg) {
String msgId = generateMsgId(msg);
// 设置 24 小时过期,避免 Redis 存储大量无用数据
redisTemplate.opsForValue().set(PROCESSED_MSG_KEY + ":" + msgId, "1", 24, TimeUnit.HOURS);
}
// ========== 5. 生成消息唯一标识(核心:确保重复消息的 ID 一致) ==========
private String generateMsgId(String msg) {
// 方案1:如果消息本身有唯一 ID(如 orderId),直接提取(推荐)
// 示例:msg 格式为 "orderId:1001,status:paid",提取 orderId 作为唯一标识
if (msg.contains("orderId:")) {
String[] parts = msg.split(",");
for (String part : parts) {
if (part.startsWith("orderId:")) {
return part.split(":")[1];
}
}
}
// 方案2:无唯一 ID 时,对消息内容做哈希(避免重复)
return UUID.nameUUIDFromBytes(msg.getBytes()).toString();
}
// ========== 6. 正常的 Pub/Sub 消费逻辑 ==========
private void startNormalConsume() {
redisTemplate.convertAndSend(CHANNEL, "test"); // 测试发送
// 订阅消息
redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {
String msg = new String(message.getBody());
// 正常消费时也要先检查是否已处理(避免兜底和正常消费重复)
if (!isMessageProcessed(msg)) {
processMessage(msg);
markMessageProcessed(msg);
}
}, CHANNEL.getBytes());
}
// ========== 7. 实际的业务消息处理逻辑 ==========
private void processMessage(String msg) {
// 你的业务逻辑:如更新订单状态、发送通知等
System.out.println("处理消息:" + msg);
}
// ========== 8. 关闭资源 ==========
public void destroy() {
scheduler.shutdown();
}
}
二、关键逻辑解释(解决你的核心问题)
1. recoverLostMessages 的调用时机
- 启动时调用:通过
@PostConstruct注解,消费端 Bean 初始化时自动执行,覆盖进程重启/GC 恢复后的漏消息; - 定时调用:通过
ScheduledExecutorService每 1 分钟执行一次,兜底偶发的漏消息(如 GC 停顿期间兜底 List 未被及时处理)。
2. 规避重复消费的核心策略
重复消费的根源是:兜底消息和正常 Pub/Sub 消息可能被多次读取,因此需要全局唯一的消息标识 + 已处理消息记录:
- 原子读取兜底消息:使用 Lua 脚本一次性获取并删除兜底 List 的所有消息,避免多实例同时读取导致重复;
- 消息唯一标识:
- 优先用消息自带的唯一 ID(如订单号、消息 ID),这是最高效的方式;
- 无唯一 ID 时,对消息内容做哈希(如 UUID.nameUUIDFromBytes),确保相同消息的 ID 一致;
- 已处理消息缓存:将已处理的消息 ID 存入 Redis(设置 24 小时过期),消费前先检查,已处理则跳过。
3. 额外优化:正常消费时也防重复
即使是正常的 Pub/Sub 消费,也可能因网络重试、进程重启导致重复接收,因此在 startNormalConsume 中,消费消息前同样要检查 isMessageProcessed,确保全链路防重复。
三、补充说明(适配不同场景)
-
如果消息体较大:
- 兜底存储时,不要存完整消息,只存消息唯一 ID,完整消息存在 Redis Hash 中(Key:
msg_content:{msgId}),减少内存占用; - 示例:发布者存储
msg_backup:order_notify(List 存 msgId) +msg_content:1001(Hash 存完整消息)。
- 兜底存储时,不要存完整消息,只存消息唯一 ID,完整消息存在 Redis Hash 中(Key:
-
多实例部署时:
- 兜底 List 的原子读取(Lua 脚本)已保证多实例不会重复读取,无需额外锁;
- 已处理消息的 Redis Key 是全局的,多实例共享,确保重复消息被拦截。
-
GC 停顿检测(进阶):
- 若要精准检测 GC 停顿并触发补全,可通过 JMX 监听 GC 事件,GC 结束后立即调用
recoverLostMessages:// 监听 Full GC 事件 ManagementFactory.getGarbageCollectorMXBeans().forEach(bean -> { bean.addNotificationListener((notification, handback) -> { String gcName = notification.getType(); if (gcName.contains("FullGC") || gcName.contains("G1 Full GC")) { // Full GC 后立即补全消息 recoverLostMessages(); } }, null, null); });
- 若要精准检测 GC 停顿并触发补全,可通过 JMX 监听 GC 事件,GC 结束后立即调用
总结
recoverLostMessages需在消费端启动时和定时/GC 检测后调用,覆盖所有可能漏消息的场景;- 规避重复消费的核心是:原子读取兜底消息 + 全局唯一消息 ID + 已处理消息缓存;
- 该方案是 Redis Pub/Sub 的兜底优化,无法 100% 保证无漏消息,若需绝对可靠,仍需替换为 Redis Stream/Kafka。
注意:本文归作者所有,未经作者允许,不得转载