RocketMQ 基本概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
常见RocketMQ需要关注的问题
- 如何保证消息不丢失
- 如何保证消息的顺序性
- 如何保证避免消息的重复
- 如何处理RocketMQ的消息积压
下面我们就这些问题展开聊聊
如何保证消息不丢失
消息在投递的过程中,主要经历如下三个节点:producer,broker,consumer;每个节点都有可能发生消息的丢失,所以我们需要从每个节点去考虑如何避免。
如何防止producer丢失消息
producer 由于网络不可达等原因,有可能发生消息的丢失;producer端发送消息的机制主要有如下三种:
-
同步发送:重要的通知(订单状态的更新)
-
异步发送:通常用于响应时间敏感的业务场景
-
OneWay发送:主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集
根据上面三种发送方式的特点, one-way 的消息发送模式本身就是不对消息的不丢失无法保证。所有如果你的系统对消息丢失零容忍不能使用 one-way 的方式发送。
同步发送消息和异步发送消息 都可以判断消息的发送状态判断消息是否已经发送到Broker。这里是选择同步发送还是异步发送消息看业务的需要,同步发送比较关心发送后返回的结果对时间的要求不是那么敏感。异步发送对消息返回时间敏感。
所以producer端需要采取的策略是:利用同步发送,判断消费结果,失败则重试,集群是多Master多Slave。
如何防止 broker 出现消息丢失
为了防止了解broker出现消息丢失,我们需要先了解一下RocketMQ的集群部署方案。
-
单Master 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
-
多Master模式 全是Master,例如2个Master或者3个Master,这种模式的优缺点如下
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
-
多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
缺点:Master宕机,磁盘损坏情况下会丢失少量消息(非同步刷盘的情况下)
-
多Master多Slave模式-同步双写
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机
所以要在broker端避免消息丢失,应当采用 多Master多Slave-同步复制的集群。
# master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
# slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
如何避免consumer端消息丢失
consumer端消息丢失一般原因为消费失败,但是返回CONSUMER_SUCCESS;
所以consumer端避免消息丢失应当采用避免消费失败后给broker回复消费成功,同时consumer端应当支持消息的重复消费,因为在消费失败后broker会进行消息重试。
总结
producer 端:使用同步发送的模式,并确认消息发送结果;消息发送失败需要进行重试。
broker端:需要视同多Master多Slave + 同步双写的配置来防止消息的丢失。
consumer端:业务处理正常需要返回正常的消费结果;业务处理失败不可以返回消费成产;同时消费失败以后,应当支持消息的重复消费。
如何保证消息的顺序性
首先要明确要求是全局有序还是业务范围内局部有序。
如果要保证全局有序,就必须使用单个生产者,单个队列;对性能的影响较大;一般这样的场景很少见。
其次就是业务范围内局部有序,我们需要从producer,consumer端下手去考虑。
producer端有序
普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。
这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

还有produer端的发送需要使用同步模式,因为在异步模式下是没有办法保证消息有序。
还有需要注意的是如果增加或者减少broker的数量;会触发 Rebalance,此时消 费者可能会收到重复消息;可能会造成短暂的消息无序!
消费者端的有序消费
首先我们知道消费者端的消费模式一般有顺序消费和并发消费。再并发消费场景下很难保证消息的顺序消费。
其次,顺序消费通过MessageListenerOrderly来实现;实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。
所以在分布式环境下即使使用了MessageListenerOrderly来实现,消费者的顺序没有办法保证的;所以我们需要引入分布式锁来限制并发的无序消费。
如果业务模型是有状态转换的,比如同一个订单的不同状态的消息,我们可以利用分布式锁+业务的状态机来限制消息只能顺序消费;消息必须要做业务的状态流转过程来消费。
如果没有状态模型的转换,我们则需要在消息的分发(分布式锁),以及获取到消息的消费流程中加多把锁。
问题:
-
使用了很多的锁,降低了吞吐量。
-
前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况
如何防止RocketMQ重复消费
- RocketMQ 消息发送的重复
producer 给broker 发送消息时,由于网络抖动等情况没有收到ack;导致producer认为消息发送失败,此时producer会进行消息的重发;消息的MessageId相同
- RocketMQ 消息投递的重复
consumer 在消费的过程中,没有给broker发出正确的消费成功,broker端会进行消息发送的重试操作,MessageId相同。
- RocketMQ broker 负载均衡的重复
在broker增加或者减少机器时,会触发 Rebalance,此时消 费者可能会收到重复消息
防范机制
- 利用数据库有条件的插入语句限制重复插入
查询消息系统验证消息是否重复。在消费时,通过订阅的记录和消费的结果来判断,此消息是否重复订阅过,倘若重复订阅,则不再数据库中插入数据。
-
分布式锁(一般是zookeeper和redis搭建)
-
数据库约束+java异常处理机制。需要针对数据库简历约束,不允许产生重复数据,然后再使用java的异常处理机制来规避重复消息
RocketMQ如何处理消息积压/消息堆积
首先,我们得对自己所用的的RocketMQ集群有个清楚的认识,比如说broker是不是单点的,而producer,consumer又是集群化,这样broker也有可能会有积压;核心是对业务所用的中间件有明确的认识。
其实,如果没有问题,那么问题的本质就是产生消息的速度大于消费消息的速度;
需要关注producer 和 consumer 集群的配比是否合理,比如producer 集群远大于 consumer 集群。
然后需要关注consumer是不是有异常消费的情况,是不是代码bug导致的;如果可以短时间内修复,应该抓紧启动修复机制。如果但时间内无法修复,那么就需要考虑broker有没有开启持久化机制,如果没有开启持久化机制,并且短时间也无法修复consumer的问题,我们就得考虑临时的紧急预案;比如把写一个消费者客户端,只做消息的转发,不做业务处理;将消息转发到临时的消息队列并且开启持久化机制,等业务consumer修复好了,再将消息拿出来。
- 本文链接: https://www.sunce.wang/archives/rokcetmq-chang-jian-wen-ti
- 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!