0%

MQ如何避免消息重复消费

    消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。

    我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢。
    即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一。
    也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
举个例子,一个消息 M 发送到了消息中间件,消息投递到了消费程序 A。A 接受到了消息,然后进行消费。但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
然而这种可靠的特性会导致消息可能被多次地投递,这时候就可能出现程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以它还会继续投递,但这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

简单的消息去重方案

    假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
insert into t_order values …..update t_inv set count = count-1 where good_id = ‘good123’;
    要实现消息的幂等,只要先判断这个good_id是否存在即可。这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

Exactly Once

    在消息中间件里,有一个投递语义的概念。而这个语义里有一个叫 Exactly Once ,即消息肯定会被成功消费,并且只会被消费一次。
    以下是Exactly Once的通俗解释:
    Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
    在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是 Exactly Once。

    Exactly-Once 语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。
    例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。
    业界对于 Exactly-Once 投递语义存在很大的争议,很多人会拿出“FLP不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的Exactly-Once语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。如果要实现一条消息的消费结果只能在业务系统中生效一次,需要解决的只是如何保证同一条消息的消费幂等问题。

    要实现 Exaclty Once 即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做。
    在数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。流程看起来像是这样的:

  1. 开启事务
  2. 插入消息表
  3. 更新订单表
  4. 提交事务

    这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了。

    这时候就算MQ还没有收到消费位点的更新,从而再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。

    这保证我们消费代码只会执行一次。

    如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功。

    但是这里有它的局限性:消息的消费逻辑必须是依赖于关系型数据库事务
    如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。

    还有,数据库的数据必须是在一个库,跨库无法解决。

    另外,需要特别注意的是,在业务上,消息表的设计不应该以消息 ID 作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。

去事务化的解决方案

    这样的设计解决了三个问题

  • 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  • 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  • 上游业务生产者重发的业务重复的消息幂等问题。

    关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用 MySQL 作为消息表的存储媒介,设置消息的唯一 ID 为主键,那么插入的动作只有一条消息会成功。后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。
    关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是 messageId 即可。所以也不是问题。
    但这样的方案还是会有消息丢失的风险的,当在并发场景下我们依赖于消息状态是做并发控制使得第 2 条消息重复的消息会不断延迟消费,即重试。但如果这时候第 1 条消息也由于一些异常原因,例如机器重启了、外部异常导致消费失败,没有消费成功呢?也就是说这时候延迟消费实际上每次过来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信 Topic 中。
    对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如 10 分钟。意思是如果一个消息处于消费中超过 10 分钟,就需要从消息表中删除,这一点需要程序自行实现。

    这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。
    使用Redis有两个好处:

  • 性能上损耗更低
  • 上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

    当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

一些其他的消息去重的建议

    事实上,这已经能解决 99% 的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  • 消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  • 消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  • 一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)。

    在上面做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好 #1 的回滚,使得下次重试消费成功。