消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
简单的消息去重解决方案
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
select * from t_order where order_no = 'order123'
if(order != null) {
return ;//消息重复,直接返回
}
并发重复消息
并发去重的解决方案之一
select * from t_order where order_no = 'THIS_ORDER_NO' for update //开启事务
if(order.status != null) {
return ;//消息重复,直接返回
}
Exactly Once
Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
基于关系数据库事务插入消息表
update t_order set status = 'SUCCESS' where order_no= 'order123';
开启事务 插入消息表(处理好主键冲突的问题) 更新订单表(原消费逻辑) 提交事务
这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。
如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。
消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。 数据库的数据必须是在一个库,跨库无法解决
更复杂的业务场景
检查库存(RPC) 锁库存(RPC) 开启事务,插入订单表(MySQL) 调用某些其他下游服务(RPC) 更新订单状态 commit 事务(MySQL)
拆解消息执行过程
库存系统消费A:检查库存并做锁库存,发送消息B给订单服务 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统 订单系统消费消息D:更新订单状态
更通用的解决方案
基于消息幂等表的非事务方案
消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。 支持上游业务生产者重发的业务重复的消息幂等问题。
此方案是否有消息丢失的风险?
更灵活的消息表存储媒介
性能上损耗更低 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现
源码:RocketMQDedupListener
//利用Redis做幂等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");
String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);
consumer.registerMessageListener(messageListener);
consumer.start();
DedupConcurrentListener
示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。这种实现是否一劳永逸?
检查库存(RPC)
锁库存(RPC)
开启事务,插入订单表(MySQL)
调用某些其他下游服务(RPC)
更新订单状态
commit 事务(MySQL)
本实现方式的价值?
一些其他的消息去重的建议
消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等) 在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功。