RocketMQ 基本概念

RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

image-1676515163517

常见RocketMQ需要关注的问题

  • 如何保证消息不丢失
  • 如何保证消息的顺序性
  • 如何保证避免消息的重复
  • 如何处理RocketMQ的消息积压

下面我们就这些问题展开聊聊

如何保证消息不丢失

消息在投递的过程中,主要经历如下三个节点:producer,broker,consumer;每个节点都有可能发生消息的丢失,所以我们需要从每个节点去考虑如何避免。

image-1676516118134

如何防止producer丢失消息

producer 由于网络不可达等原因,有可能发生消息的丢失;producer端发送消息的机制主要有如下三种:

  • 同步发送:重要的通知(订单状态的更新)

  • 异步发送:通常用于响应时间敏感的业务场景

  • OneWay发送:主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集

根据上面三种发送方式的特点, one-way 的消息发送模式本身就是不对消息的不丢失无法保证。所有如果你的系统对消息丢失零容忍不能使用 one-way 的方式发送。

同步发送消息和异步发送消息 都可以判断消息的发送状态判断消息是否已经发送到Broker。这里是选择同步发送还是异步发送消息看业务的需要,同步发送比较关心发送后返回的结果对时间的要求不是那么敏感。异步发送对消息返回时间敏感。

所以producer端需要采取的策略是:利用同步发送,判断消费结果,失败则重试,集群是多Master多Slave。

如何防止 broker 出现消息丢失

为了防止了解broker出现消息丢失,我们需要先了解一下RocketMQ的集群部署方案。

  • 单Master 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

  • 多Master模式 全是Master,例如2个Master或者3个Master,这种模式的优缺点如下

    优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

  • 多Master多Slave模式-异步复制
    每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下

    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

    缺点:Master宕机,磁盘损坏情况下会丢失少量消息(非同步刷盘的情况下)

  • 多Master多Slave模式-同步双写
    每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

    优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

    缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机

所以要在broker端避免消息丢失,应当采用 多Master多Slave-同步复制的集群。

	# master 节点配置
	flushDiskType = SYNC_FLUSH
	brokerRole=SYNC_MASTER
 
	# slave 节点配置
	brokerRole=slave
	flushDiskType = SYNC_FLUSH

如何避免consumer端消息丢失

consumer端消息丢失一般原因为消费失败,但是返回CONSUMER_SUCCESS;

所以consumer端避免消息丢失应当采用避免消费失败后给broker回复消费成功,同时consumer端应当支持消息的重复消费,因为在消费失败后broker会进行消息重试。

总结

producer 端:使用同步发送的模式,并确认消息发送结果;消息发送失败需要进行重试。

broker端:需要视同多Master多Slave + 同步双写的配置来防止消息的丢失。

consumer端:业务处理正常需要返回正常的消费结果;业务处理失败不可以返回消费成产;同时消费失败以后,应当支持消息的重复消费。

如何保证消息的顺序性

首先要明确要求是全局有序还是业务范围内局部有序。

如果要保证全局有序,就必须使用单个生产者,单个队列;对性能的影响较大;一般这样的场景很少见。

image-1676519268548

其次就是业务范围内局部有序,我们需要从producer,consumer端下手去考虑。

producer端有序

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。

image-1676519612110

这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

![image-1676519668829](https://www.sunce.wang/upload/2023/02/image-1676519668829.png)

还有produer端的发送需要使用同步模式,因为在异步模式下是没有办法保证消息有序。

还有需要注意的是如果增加或者减少broker的数量;会触发 Rebalance,此时消 费者可能会收到重复消息;可能会造成短暂的消息无序!

消费者端的有序消费

首先我们知道消费者端的消费模式一般有顺序消费和并发消费。再并发消费场景下很难保证消息的顺序消费。

其次,顺序消费通过MessageListenerOrderly来实现;实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。

所以在分布式环境下即使使用了MessageListenerOrderly来实现,消费者的顺序没有办法保证的;所以我们需要引入分布式锁来限制并发的无序消费。

如果业务模型是有状态转换的,比如同一个订单的不同状态的消息,我们可以利用分布式锁+业务的状态机来限制消息只能顺序消费;消息必须要做业务的状态流转过程来消费。

如果没有状态模型的转换,我们则需要在消息的分发(分布式锁),以及获取到消息的消费流程中加多把锁。

问题:

  • 使用了很多的锁,降低了吞吐量。

  • 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况

如何防止RocketMQ重复消费

  • RocketMQ 消息发送的重复

producer 给broker 发送消息时,由于网络抖动等情况没有收到ack;导致producer认为消息发送失败,此时producer会进行消息的重发;消息的MessageId相同

  • RocketMQ 消息投递的重复

consumer 在消费的过程中,没有给broker发出正确的消费成功,broker端会进行消息发送的重试操作,MessageId相同。

  • RocketMQ broker 负载均衡的重复

在broker增加或者减少机器时,会触发 Rebalance,此时消 费者可能会收到重复消息

防范机制

  • 利用数据库有条件的插入语句限制重复插入

查询消息系统验证消息是否重复。在消费时,通过订阅的记录和消费的结果来判断,此消息是否重复订阅过,倘若重复订阅,则不再数据库中插入数据。

  • 分布式锁(一般是zookeeper和redis搭建)

  • 数据库约束+java异常处理机制。需要针对数据库简历约束,不允许产生重复数据,然后再使用java的异常处理机制来规避重复消息

RocketMQ如何处理消息积压/消息堆积

首先,我们得对自己所用的的RocketMQ集群有个清楚的认识,比如说broker是不是单点的,而producer,consumer又是集群化,这样broker也有可能会有积压;核心是对业务所用的中间件有明确的认识。

其实,如果没有问题,那么问题的本质就是产生消息的速度大于消费消息的速度;

需要关注producer 和 consumer 集群的配比是否合理,比如producer 集群远大于 consumer 集群。

然后需要关注consumer是不是有异常消费的情况,是不是代码bug导致的;如果可以短时间内修复,应该抓紧启动修复机制。如果但时间内无法修复,那么就需要考虑broker有没有开启持久化机制,如果没有开启持久化机制,并且短时间也无法修复consumer的问题,我们就得考虑临时的紧急预案;比如把写一个消费者客户端,只做消息的转发,不做业务处理;将消息转发到临时的消息队列并且开启持久化机制,等业务consumer修复好了,再将消息拿出来。