# 1. RocketMQ概念术语
生产者和消费者
消息模型(Message Model)
- 消息模型主要有队列模型和发布订阅模型,RabbitMQ采用的是 队列模型,RocketMQ采用发布订阅模型。
消息存储
主题(Topic) (opens new window):Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
队列(MessageQueue) (opens new window):Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
消息(Message) (opens new window):Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。
主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能 属于一个主题,是RocketMQ进行消息订阅的基本单位
代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息
名字服务(Name Server):名称服务器管理代理服务器
生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送 逻辑一致。
消费者组:同一类Consumer的集合,这类Consumer通常消费同一类消息 且消费逻辑一致。
拉取式消费(Pull Consumer):Consumer消费的一种类型,应用通常主动调用Consumer的拉 消息方法从Broker服务器拉消息、主动权由应用控制。一旦获 取了批量消息,应用就会启动消费过程。
推动式消费:Consumer消费的一种类型,该模式下Broker收到数据后会主动 推送给消费端,该消费模式一般实时性较高。
普通顺序消息(Normal Ordered Message):普通顺序消费模式下,消费者通过同一个消息队列( Topic 分 区,称作 Message Queue) 收到的消息是有顺序的,不同消息 队列收到的消息则可能是无顺序的。
严格顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
# 1.1 消息消费
- 消费者分组:RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
- 消费者:RocketMQ消费消息的实体,消费者必须被指定到一个消费组中。
- 订阅关系:RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
# 1.2 消息传输模型介绍
# 1.2.1 点对点模型
特点:
- 消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份
- 一对一通信:基于消费匿名特点,但都没有自己独立的身份,因此共享队列中的消息,每一条消息只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
# 1.2.2 发布订阅模型
特点:
- 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费房都会具备身份,一般叫做(订阅关系),不同订阅组之间相互独立不会互相影响
- 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每隔订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
# 1.3 主题(Topic)
定义:RocketMQ中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息
作用:
- 定义数据的分类隔离:建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。
- 定义数据的身份和权限:RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
主题内部由多个队列组成,消息的存储和水平扩展能力最终是由队列实现的。并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列来实现。
消息类型:
- Normal:普通消息 (opens new window),消息本身无特殊语义,消息之间也没有任何关联。
- FIFO:顺序消息 (opens new window),Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
- Delay:定时/延时消息 (opens new window),通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
- Transaction:事务消息 (opens new window),Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
行为约束:
**消息类型强制校验:**系统会对发送的消息类型和主题定的消息类型进行强制校验,若校验不通过,则消息发送请求会被拒绝
- 消息类型必须一致发送的消息的类型,必须和目标主题定义的消息类型一致。
- 主题类型必须单一每个主题只支持一种消息类型,不允许将多种类型的消息发送到同一个主题中。
# 1.4 队列
队列是RocketMQ中消息存储和传输的实际容器,也是RocketMQ消息的最小存储单元,所有主题都是由多个队列组成,由此实现队列数量的水平拆分和队列内部的流式存储
队列的主要作用:
- 存储顺序性:队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。
- 流式操作语义:RocketMQ基于队列的存储模型可确保消息从任意位点读取任意数量的消息。
# 1.4.1 内部属性
读写权限:
定义:当前毒烈是否可以读写数据
取值:由服务端定义,枚举值如下
- 6:读写状态,当前队列允许读取消息和写入消息。
4:只读状态,当前队列只允许读取消息,不允许写入消息。
2:只写状态,当前队列只允许写入消息,不允许读取消息。
0:不可读写状态,当前队列不允许读取消息和写入消息。
约束:队列的读写权限属于运维侧操作,不建议频繁修改。
版本兼容性:
- 服务端3.x/4.x版本:队列名称由{主题名称}+{BrokerID}+{QueueID}三元组组成,和物理节点绑定。
- 服务端5.x版本:队列名称为一个集群分配的全局唯一的字符串组成,和物理节点解耦。
# 1.5 消息(Message)
消息是RocketMQ中的最小数据传输单元。
消息模型特点:
- 消息不可变形:消息本质上是已经产生并确定的事件,一旦产生后,消息的内容不会发生改变。
- 消息持久化:Apache RocketMQ会默认对消息进行持久化
# 1.5.1 消息内部属性
主题名称:
- 定义:当前消息所属的主题的名称。集群内全局唯一。
- 取值:从客户端SDK接口获取。
消息类型:
定义:当前消息的类型
取值:
- Normal:普通消息 (opens new window),消息本身无特殊语义,消息之间也没有任何关联。
- FIFO:顺序消息 (opens new window),Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
- Delay:定时/延时消息 (opens new window),通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
- Transaction:事务消息 (opens new window),Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
消息位点:
- **定义:**当前消息存储在队列中的位置
- 取值:0-long.max
消息ID
- 消息的唯一标识
- 取值:生产者客户端系统自动生成。固定为数字和大写字母组成的32位字符串。
索引Key列表(可选)
- 定义:消息的索引键,可以通过设置不同的Key区分消息和快速查找消息
- 取值:由生产者客户端定义
过滤标签Tag()
- 定义:消息的过滤标签。消费者可通过Tag对消息进行过滤,仅接受指定标签的内容
- 取值:又生产者客户端定义
- 约束:一条消息仅支持设置一个标签
定时时间(可选)
- 定义:定时场景下,消息触发延时投递的毫秒级时间戳
- 取值:由消息生产者定义
- 约束:最大可设置定时时长40天
消息发送时间
- 定义:消息发送时,生产者客户端本地毫秒级时间戳
- 取值:由生产者客户端系统填充
- **说明:**客户端系统时钟和服务端系统时钟可能存在偏差,消息发送时间是以客户端系统时钟为准。
消息保存时间戳:
- 定义:服务端完成存储时,服务端系统的本地毫秒级时间戳,对于定时消息和事物消息,消息保存时间指的是消息生效对消费房可见的服务端系统时间。
- 取值:由服务端系统填充
消费重试次数
- 定义:消息消费失败后,服务端重新投递的次数。
- 取值:由服务端系统标记。
业务自定义属性
- 定义:生产者可以自定义设置的扩展信息
- 取值:由消息生产者自定义,按照字符串键值对设置
消息负载
- 定义:业务消息的实际报文数据
- 取值:由生产者负责序列化编码,按照二进制字节传输
消息大小不得超过其类型所对应的限制,否则消息会发送失败。
系统默认的消息最大限制如下:
- 普通和顺序消息:4 MB
- 事务和定时或延时消息:64 KB
# 1.6 生产者(Producer)
消息生产者中,可以定义如下传输行为:
- 发送方式:生产者可通过API接口设置消息发送的方式。Apache RocketMQ 支持同步传输和异步传输。
- 批量发送:生产者可通过API接口设置消息批量传输的方式。例如,批量发送的消息条数或消息大小。
- 事务行为:Apache RocketMQ 支持事务消息,对于事务消息需要生产者配合进行事务检查等行为保障事务的最终一致性。
生产者和主题的关系为多对多关系
# 1.6.1 内部属性
客户端ID:生产者客户端的标识,用于区分不同的生产者。集群内全局唯一。
预绑定主题列表
定义:Apache RocketMQ 的生产者需要将消息发送到的目标主题列表,主要作用如下:
- 事务消息 (必须设置) :事务消息场景下,生产者在故障、重启恢复时,需要检查事务消息的主题中是否有未提交的事务消息。避免生产者发送新消息后,主题中的旧事务消息一直处于未提交状态,造成业务延迟。
非事务消息 (建议设置) :服务端会在生产者初始化时根据预绑定主题列表,检查目标主题的访问权限和合法性,而不需要等到应用启动后再检查。若未设置,或后续消息发送的目标主题动态变更, Apache RocketMQ 会对目标主题进行动态补充检验。
约束:对于事务消息,预绑定列表必须设置,且需要和事务检查器一起配合使用。
# 1.7 消费者分组(ConsumerGroup)
消费者组是RocketMQ系统中承载多个消费行为一致的消费者的负载均衡
和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
在消费者分组中,统一定义以下消费行为,同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。
- 订阅关系:RocketMQ以消费者分组的粒度管理订阅关系
- 投递顺序:RocketMQ的服务端将消息投递给消费者消费时,支持顺序投递和并发投递
- 消费重试策略
- 消费者的投递顺序一致:同一消费者分组下所有消费者的消费投递顺序是相同的,统一都是顺序投递或并发投递,不同业务场景不能混用消费者分组。
- 消费者业务类型一致:一般消费者分组和主题对应,不同业务域对消息消费的要求不同,例如消息过滤属性、消费重试策略不同。因此,不同业务域主题的消费建议使用不同的消费者分组,避免一个消费者分组消费超过10个主题。
# 1.8 消费者(Consumer)
消费者是RocketMQ中用来接受并处理消息的运行实体
在消息消费端,可以定义如下传输行为:
- 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态
- 消费者类型
- 消费者本地运行配置:
# 1.9 订阅关系
订阅关系是RocketMQ系统中消费者获取消息、处理消息的规则和状态配置。
订阅关系由消费者分组动态注册到服务端系统。并在后续的消息传输中订阅关系定义的过滤规则进行消息匹配和消费进度维护。 通过配置订阅关系,可控制如下传输行为
- 消息过滤规则
- 消费状态
# 1.10 消费者分类
**PushConsumer:**PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。
SimpleConsumer:SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。
# 1.11 RocketMQ中PushConsumer、SimpleConsumer、PullConsumer的关系
- PushConsumer:PushConsumer是常用的消费者类型,它可以自动从服务器拉取消息,也可以由服务器推送消息到客户端。其工作方式类似于事件驱动,当有新消息到达时会立即触发消费操作。
- SimpleConsumer:SimpleConsumer是一种低级别的消费者类型,它需要手动控制消息的拉取过程,并且不能处理集群模式下的消息负载均衡。因此,SimpleConsumer比较适合对消息实时性要求较高、但消费量不大的场景。
- PullConsumer:PullConsumer是另一种手动控制消息拉取过程的消费者类型,与SimpleConsumer类似,但支持集群模式下的消息负载均衡。使用PullConsumer时需要调用fetch方法来获取消息,可以按照某个时间点或消息ID等条件进行拉取。
# 1.12消费者负载均衡
根据消费者类型的不同,消费者负载均衡策略分为以下两种模式:
- 消息粒度负载均衡 (opens new window):PushConsumer和SimpleConsumer默认负载策略
消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。
顺序消息负载机制
顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费
- 队列粒度负载均衡 (opens new window):PullConsumer默认负载策略
队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费。
# 2. RocketMQ架构
- producer:消息发布的角色
- Consumer:消息消费的角色-推拉俩种消费消息模式
- NameServer:管理Broker代理服务器
- BrokerServer:rocketMQ的核心,负责消息的接收和转发
RocketMQ 网络部署特点:
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个 Master,Master与Slave 的对应关系通过指定相同的 BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。
Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、 Slave发送心跳。Consumer既可以从Master订阅消息,也可以 从Slave订阅消息。
工作流程:
- 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一 个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
# 3. RokcetMQ的高级特性
# 3.1 消息存储
采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。
# 3.2 负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
# 3.3 事务消息
采用XA分布式事务模式,保证最终一致性。
RocketMQ采用了2PC的思想实现了提交事务消息,同时增加了一个补偿逻辑处理二阶段超时或者失败的消息
事务消息发送步骤如下:
生产者将半事务消息发送至消息队列RocketMQ服务端。 消息队列RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
生产者开始执行本地事务逻辑。生产者根据本地事务执行结果向服务端提交二次确认结果 (Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
二次确认结果为Rollback:服务端不会将该消息投递给消费者,并按照如下逻辑进行回查处理。
事务消息回查步骤如下: 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
# 3.4 顺序消息
消息有序指的是按照消息的发送顺序来消费(FIFO),RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序,部分有序只要保证每一组被顺序消费即可。
# 3.5 消息重试
# 3.5.1 生产端重试:
发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时 RocketMQ会自动进行重试。
# 3.5.2 消费端重试
- 同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给 Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。
- 顺序消息重试
// 同步发送消息,如果5秒内没有发送成功,则重试3次 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生无序消息重试
- 对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果。
# 3.6延迟消息
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX, 将消息写入真实的topic。
# 3.7消息查询
# 4. NameServer
消息客户端与NameServer、Broker的交互
- Broker每隔30s向NameServer集群的每一台机器发送心跳包, 包含自身创建的topic路由等信息。
- 消息客户端每隔30s向NameServer更新对应topic的路由信息。
- NameServer收到Broker发送的心跳包时会记录时间戳。
- NameServer每隔10s会扫描一次brokerLiveTable(存放心跳包 的时间戳信息),如果在120s内没有收到心跳包,则认为Broker失 效,更新topic的路由信息,将失效的Broker信息移除。
RocketMQ基于订阅发布机制,一个topic拥有多个消息队列,一个 Broker默认为每一主题创建4个读队列和4个写队列。多个Broker组成 一个集群,BrokerName由相同的多台Broker组成主从架构, brokerId=0代表主节点,brokerId>0表示从节点。BrokerLiveInfo中 的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
NameServer处理心跳包
**路由发现:**RocketMQ路由发现是非实时的,当topic路由出现变化后, NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的 路由。
# 5. RocketMQ消息发送
RocketMQ发送普通消息有3种实现方式:可靠同步发送、可靠异步 发送和单向发送。
# 5.1 漫谈RocketMQ消息发送
RocketMQ支持3种消息发送方式:同步,异步和单向
- 同步:发送者向RocketMQ执行发送消息API时,同步等待,直到消息服务器返回发送结果
- 异步:发送者向RocketMQ执行发送消息API时,指定消息发送 成功后的回调函数,调用消息发送API后,立即返回,消息发送者线程 不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的 线程中执行。
- 单向:消息发送者向RocketMQ执行发送消息API时,直接返 回,不等待消息服务器的结果,也不注册回调函数。简单地说,就是 只管发,不在乎消息是否成功存储在消息服务器上。
# 5.1.1 topic路由机制
消息发送者像某一个topic发送消息时,需要查询topic的路由信息。初次发送时会根据名称向NameServer集群查询topic的路由信息,然后将其存储在本地内存缓存中。每隔30s依次便利缓存中的topic,向NamesServer查询最新的路由信息。如果成功查询到路由消息,更新至本地缓存。实现topic路由信息的动态感知。
RocketMQ提供了自动创建主题(topic)的机制。生产者发像一个不存在的主题发送消息时,向NameServer查询该主题的路由信息会先返回空。如果开启了自动创建主题机制。会使用一个默认的主题名 再次从NameServer查询路由信息,然后消息发送者会使用默认主题的 路由信息进行负载均衡,但不会直接使用默认路由信息为新主题创建 对应的路由信息。
RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由 信息来自Broker的心跳包并存储在内存中。
# 5.1.2 消息发送高可用设计
发送端在自动发现主题的路由信息后,RocketMQ默认使用轮训算法进行路由的负载均衡。RocketMQ发送消息时支持自定义的队列负载算法。但是RocketMQ的重试机制将失效。
为了实现消息发送高可用,引入了俩个重要的特性。
- 消息发送重试机制
- 鼓掌规避机制
# 5.2 认识RocketMQ消息
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
this.topic = topic;
this.flag = flag;
this.body = body;
if (tags != null && tags.length() > 0) {
this.setTags(tags);
}
if (keys != null && keys.length() > 0) {
this.setKeys(keys);
}
this.setWaitStoreMsgOK(waitStoreMsgOK);
}
tags:消息tag,用于消息过滤
keys:消息索引键,用空格隔开,RocketMQ可以根据这些Key快速检索消息
waitStoreMsgOK:消息发送时是否等消息存储完成在返回。
# 5.3 生产者启动流程
DefaultMQProducer是默认的消息生产者实现类,实现了MQAdmin 的接口
# 5.4消息发送基本流程
消息发送流程主要的步骤为验证消息、超找路由、消息发送。
默认消息发送以同步方式发送,默认超市时间为3s。
# 5.4.1 消息长度验证
具体的规范要求是主题名称、消息体不能为 空,消息长度不能等于0且默认不能超过允许发送消息的最大长度 4MB(maxMessageSize=1024×1024×4)
# 5.4.2 查找主题路由信息
如果生产这种混存了topic的路由信息,且该路由信息包含消息队列,则直接返回该路由信息。如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常。
# 5.4.3 选择消息队列
根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。举例说明,如果topicA在broker-a、broker-b上分别创建了4 个队列,那么返回的消息队列为[{"brokerName":"brokera"、"queueId":0}、{"brokerName":"broker-a"、"queueId":1}、 {"brokerName":"broker-a"、"queueId":2}、 {"brokerName":"broker-a"、"queueId":3}、 {"brokerName":"broker-b"、"queueId":0}、 {"brokerName":"broker-b"、"queueId":1}、 {"brokerName":"broker-b"、"queueId":2}、 {"brokerName":"broker-b"、"queueId":3}],那么RocketMQ如何选择 消息队列呢?
首先消息发送端采用重试机制,由retryTimesWhenSendFailed指 定同步方式重试次数,异步重试机制在收到消息发送结果执行回调之 前进行重试,由retryTimesWhenSend AsyncFailed指定异常重试次数。
接下来就是循环执行,选择消息队列、发送消息。选择消息队列有俩种方式
- sendLatencyFaultEnable=false,默认不启用Broker故障延迟
- sendLatencyFaultEnable=true,启用Broker故障延迟机制。
所谓开启的故障延迟机制,即设置endLatencyFaultEnable=true 其实是一种较为悲观的做法,当消息发送者遇到一次消息发送失败后,就会悲观地认为Broker不可用。在接下来的一段时间内就不再向其发送消息,直接避开该Broker。而不开启延迟规避机制,就只会在本次消息发送的重试过程中规避Broker,下一次消息发送还会继续尝试。
# 5.4.4 批量消息发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。
# 6. RocketMQ消息存储
# 6.1 存储概要设计
RocketMQ存储的文件主要包括CommitLog文件、Consumer Queue文件、Index文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时按顺序写文件。
因为消息中间件一般是基于消息主题的订阅机制,所以给 按照消息主题检索消息带来了极大的不便。
RocketMQ引入了ConsumeQueue消息消费队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。Index索引文件就是为了加速消息的检索性能。根据消息的属性从 CommitLog文件中快速检索消息。
RocketMQ数据流向
- CommitLog:消息存储,所有消息主题的消息都存储在CommitLog文件中。
- ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQueue文件中,供消息消费者消费。
- Index:消息索引,主要存储消息key与offset的对应关系
# 6.1.1 RocketMQ存储文件的组织方式
RocketMQ消息写入过程中追求极致的磁盘顺序写,所有主题的消息全部写入一个文件,即CommitLog文件。
基于文件编程 也会为每条消息引入一个身份标志:消息物理偏移量,即消息存储在 文件的起始位置。
这样做的好处就是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置。然后用消息偏移量减去文件的名称。得到该文件的绝对地址。
消息消费模型是基于主题订阅机制额,即一个消费组是消费特定主题的消息。为了解决基于topic的消息检索问题,RocketMQ引入了ConsumerQUeue文件。
ConcumeQueue文件是消息消费队列文件,是CommitLog文件基于topic的索引文件,主要用于消费者根据topic消费消息,其组织方式 为/topic/queue,同一个队列中存在多个消息文件。
如果想基于消息的某一个属性进行查找,ConsumeQueue文件无能为力了。因此RocketMQ又引入了Index索引文件,实现基于文件的哈希索引。
Index文件基于物理磁盘文件实现哈希索引。Index文件由40字节 的文件头、500万个哈希槽、2000万个Index条目组成,每个哈希槽4字 节、每个Index条目含有20个字节,分别为4字节索引key的哈希码、8 字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(哈希 冲突的链表结构)。
# 6.1.2 灵活多变的刷盘策略
有了顺序写和内存映射的加持,RocketMQ的写入性能得到了极大 的保证,但凡事都有利弊,引入了内存映射和页缓存机制,消息会先 写入页缓存,此时消息并没有真正持久化到磁盘。那么Broker收到客 户端的消息后,是存储到页缓存中就直接返回成功,还是要持久化到 磁盘中才返回成功呢?
RocketMQ提供了俩种刷盘策略:
- 同步刷盘:
同步刷盘在RocketMQ的视线中称做组提交。
同步刷盘的优点是能保证消息不丢失,即向客户端返回成功就代 表这条消息已被持久化到磁盘,但这是以牺牲写入性能为代价的,不 过因为RocketMQ的消息是先写入pagecache,所以消息丢失的可能性较 小,如果能容忍一定概率的消息丢失或者在丢失后能够低成本的快速 重推,可以考虑使用异步刷盘策略。
- 异步刷盘
异步刷盘指的是broker将消息存储到pagecache后就立即返回成功。然后开启一个异步线程定时执行FileChannel的force方法,将内存中的数据定时写入磁盘,默认间隔时间是500ms
# 6.1.4 transientStorePoolEnable机制
RocketMQ为了降低pagecache的使用压力,引入了 transientStorePoolEnable机制,即内存级别的读写分离机制。
默认情况下,RocketMQ将消息写入pagecache,消息消费从pagecache中读取,这样在高并发时pagecache的压力会比较大,容易 出现瞬时broker busy的异常。
**transientStorePoolEnable机制,将消息先写入堆外内存并立即返回,然后一步将堆外内存中的数据提交到pagecache,在异步刷盘到磁盘中。因为堆外内存中的数据并未提交,认为这是不可信的数据,消息在消费时不会从堆外内存中读取,而是从pagecache中读取,**这样 就形成了内存级别的读写分离,即写入消息时主要面对堆外内存,而 读取消息时主要面对pagecache。
# 6.1.5 文件恢复机制
在RocketMQ中有broker异常停止恢复和正常停止恢复两种场景。 这两种场景的区别是定位从哪个文件开始恢复的逻辑不一样,大致思 路如下。
尝试恢复ConsumeQueue文件,根据文件的存储格式,找到最后一条完整的消息格式对应的物理偏移量。用maxPhysical OfConsumequeue表示
尝试恢复CommitLog文件,先通过文件的魔数判断该文件是否为ComitLog文件,然后按照消息的存储格式寻找最后一条合格的消息,拿到其偏移量
- 如果ComiitLog文件的有效偏移量小于ConsumerQueue文件存储的最大物理偏移量,将会删除ConsumeQueue中多余的内容,如果大于,说明ConsumeQueue文件存储的内容少于CommitLog文件,则会重推数据。
如何定位要恢复的文件
- 正常停止刷盘的情况下,先从倒数第三个文件开始进行恢复,然后按照消息的存储格式进行查找。如果该文件中所有的消息都符合消息存储格式,则继续查找下一额文件,直到找到最后一条消息所在的位置
- 异常停止刷盘的情况下,RocketMQ会借助检查点文件,即存储的 刷盘点,定位恢复的文件。刷盘点记录的是CommitLog、 ConsuemQueue、Index文件最后的刷盘时间戳,从最后一个文件开始寻找,如果该文件第一条消息的存储时间小于检查点文件中的刷盘时间,就可以从这个文件开始恢复,如果大于刷盘点,需要寻找上一个文件,因为检查点文件中的刷盘点代表的是100%可靠的消息。
# 6.2 存储文件组织与内存映射
RocketMQ通过使用内存映射文件来提高I/O访问性能,无论是CommitLog、Consume-Queue还是Index,单个文件都被设计为固定长度,一个文件写满以后在创建新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。
# 6.2.1 MappedFileQueue映射文件队列
MappedFileQueue是MappedFile的管理容器,MappedFileQueue对 存储目录进行封装。该目录下会存在多个内存映射文件MappedFile。
# 6.2.2 MappedFile内存映射文件
MappedFile是RocketMQ内存映射文件的具体实现
# 6.2.3 TransientStorePool
TransientStroePool即短暂的存储池,RocketMQ单独创建了一个DirectByteBuffer内存缓存池,用来拎出存储数据库,数据先写入该内存映射中,然后由Commit线程定时将数据从该文件内存复制到与目标屋里文件对应的内存映射中。RokcetMQ引入该机制是为了提供一种内存锁 定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁 盘中。
# 6.3 RocketMQ存储文件
commitlog:消息存储目录
config:运行期间的一些配置信息,主要包括
- consumerFilter.json:主题过滤信息
consumerOffert.json:集群消费模式下的消息消费进度
delayOffset.json:延时消息队列拉取进度。
subscriptionGroup.json:消息消费组的配置信息。
topics.json:topic配置属性。
consumerqueue:消息消费队列存储目录
abort:如果存在abort文件,说明Broker非正常关闭,改文件默认在启动Broker时创建,在正常退出之前删除
index:消息索引文件存储目录
checkpoint:监测点文件,存储CommitLog文件最后一次刷盘时间戳,ConsumeQueue最后一次刷盘时间、index文件最后一次刷盘时间戳
# 6.3.2 ConsumeQueue文件
该文件可以看作CommitLog关于消息消费的索引文件,ConsumeQueue的第一级目录为消息主题,第二季目录为主题的消息队列
# 6.3.3 Index文件
RocketMQ引入哈希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽 与哈希冲突的链表结构。
- beginTimestamp:Index文件中消息的最小存储时间
- endTimestamp:Idnex文件中消息的最大存储时间
- beginPhyoffset:Index文件中消息的最小物理偏移量 (CommitLog文件偏移量)。
- endPhyoffset:Index文件中消息的最大物理偏移量 (CommitLog文件偏移量)。
- hashslotCount:hashslot个数,并不是哈希槽使用的个数,
- indexCount:Index条目列表当前已使用的个数,Index条目在 Index条目列表中按顺序存储。
一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希 槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条 目,每个Index条目结构如下。
- hashcode:key的哈希码
- phyoffset:消息对应的物理偏移量
- timedif:该消息存储时间与第一条消息的时间戳的差值,若小于0,则该消息无效
- pre index no:该条目的前一条记录的Index索引,当出现哈希冲突时,构建链表结构
# 6.3.4 checkpoint文件
checkpoint(检查点)文件的作用是记录ComitLog、 ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4KB,其中 只用该文件的前面24字节。
1)physicMsgTimestamp:CommitLog文件刷盘时间点。
2)logicsMsgTimestamp:ConsumeQueue文件刷盘时间点。
3)indexMsgTimestamp:Index文件刷盘时间点。
# 6.4 ConsumQueue与Index文件恢复
RocketMQ是将消息全量存储在CommitLog文件中,并异步生成转发任务更新Consume Queue文件、Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于 某个原因宕机,就会导致CommitLog文件、ConsumeQueue文件、Index 文件中的数据不一致。如果不加以人工修复,会有一部分消息即便在 CommitLog文件中存在,由于并没有转发到ConsumeQueue文件,也永远 不会被消费者消费。那RocketMQ是如何使文件达到最终一致性的呢?
- 判断上一次退出是否正常,机制是Broker在启动时创建abort文件,在退出时通过注册JVM钩子函数删除abort文件。如果下一次启动时存在abort文件,说明Broker是异常退出的。
- 加载延迟队列
- 加载CommitLog文件,加载Commit目录下所有文件并按照文件名排序,如果文件与配置文件的单个文件大小不一致,忽略该目录下所有文件。
- 加载消息消费队列,遍历消息消费队列根目录,获取该Broker存储的所有主题,然后遍历每个主题,获取该主题下所有的消息消费队列,最后分别加 载每个消息消费队列下的文件,构建ConsumeQueue对象。
- 加载并存储checkPoint文件,主要用于记录CommitLog文 件、ConsumeQueue文件、Inde文件的刷盘点
- 加载Index文件,如果上次异常退出,而且Index文件刷 盘时间小于该文件最大的消息时间戳,则该文件将立即销毁
- 根据Broker是否为正常停止,执行不同的恢复策略,下 文将分别介绍异常停止、正常停止的文件恢复机制
- 恢复ConsumeQueue文件后,将在CommitLog实例中保存每个消息消费队列当前的存储逻辑偏移量。
# 6.5 文件刷盘机制
RocketMQ的存储与读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中,在根据配置的刷盘策略在不同时间刷盘,如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法,如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端。Rocket使用一个单独的线程按照某一个shedding的频率执行刷盘操作。
# 6.5.1 Broker同步刷盘
- 构建GroupCommitRequest同步任务并提交到 GroupCommitRequest。
- 等待同步刷盘任务完成,如果超时则返回刷盘错误,刷盘成功 后正常返回给调用方。
- 消费发送线程将消息追加到内存映射文件后,将同步任务 GroupCommitRequest提交到GroupCommitService线程,然后调用阻塞 等待刷盘结果,超时时间默认为5s,
为了避免同步刷盘消费任务与其他消息生产者提交任务产生锁竞 争,GroupCommitService提供读容器与写容器,这两个容器每执行完 一次任务后交互,继续消费任务,
GroupCommitService组提交线程,每处理一批刷盘请求后,如果 后续有待刷盘的请求需要处理,组提交线程会马不停蹄地处理下一 批;如果没有待处理的任务,则休息10ms,即每10ms空转一次
# 6.5.2 Broker异步刷盘
开启transientStorePoolEnable机制则启动异步刷盘方式。
如果transientStorePoolEnable为 true。RocketMQ会单独申请一个与目标物理文件同样大小的堆外内存,该对外内存将使用内存锁定,确保不会被置换到虚拟机内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射中,在经flush操作到磁盘。
如果transientStorePoolEnbale为false,消息将追加到与物理文件直接映射的内存中,然后写入磁盘
# 6.9 过期文件删除机制
如果非当前写文件在一定时间间隔内没有被再次更新,则认为是过期文件,可以被删除,默认每个文件 的过期时间为72h,
# 6.10 同步双写
在同步复制过程中,SendMessageThread线程可以继续处理其他消息,只是收到从节点的同步结果后再向客户端返回结果,重复利用Broker的资源。异步方式。
# 6.11 CommitLog、ConsumeQueue、Index关系
在RocketMQ中,CommitLog、ConsumeQueue和Index三个文件是用于消息存储、消费和查询的关键组件。
CommitLog文件是RocketMQ存储消息的主要文件,每个Broker节点都有自己的CommitLog文件。当生产者发送消息时,它会被追加到CommitLog文件的末尾。CommitLog文件中包含所有消息的完整副本,因此在发生故障或重启时,可以使用CommitLog文件来恢复数据。
ConsumeQueue文件是为了支持消息消费而创建的文件。它记录了每个消费者组消费的进度以及还未被消费的消息在CommitLog文件中的偏移量。每个主题(Topic)下的每个队列(Queue)都有一个对应的ConsumeQueue文件。
Index文件是为了提高消息查询效率而创建的文件。它记录了每个消息的索引信息,包括消息的Key、所属主题、队列编号、在CommitLog文件中的偏移量等。Index文件按照消息的Key排序,便于快速查找特定Key的消息。
因此,CommitLog文件中存储了所有的消息,而ConsumeQueue文件和Index文件则是为了支持消息消费和查询而创建的辅助文件。当消费者消费消息时,它会根据ConsumeQueue文件中记录的偏移量去CommitLog文件中读取消息;而当需要查询消息时,可以通过Index文件快速定位到消息的偏移量,然后再去CommitLog文件中读取相应的消息。
# 7. RocketMQ消息消费
# 7.1 Rocket MQ消息消费概述
消息消费以组的模式开展,一个消费组可以包含多个额消费者,每个消费组可以订阅多个主题,消费组之间有集群模式和广播模式俩种消费模式。集群模式是当前主题下的同一条消息只允许被其中一个消费者消费。广播模式是当前主题下的同一条消息将被集群内的所有消费者消费一次。
消息服务器与消费者之间的消息传送也有俩种方式:推模式和拉模式。
集群模式下,多个消费者如何对消息队列进行负载呢?一个消息队列同时只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持俩种消息过滤模式:表达式与类过滤模式。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上 的消息按顺序消费。
# 7.1.1 消费队列负载机制与重平衡
常用AVG、AVG_BY_CIRCLE、分配机制。如果消息在各个队列上平均分布,采用AVG。否则采用AVG_BY_CIRCLE
# 7.1.2 并发消费模型
RocketMQ支持并发消费与顺序消费俩种消费方式,消息的拉取于消费模型基本一致,只是顺序消费在某些环节为了保证顺序性,需要引入锁机制,RocketMQ的消息拉取与消费模式如图所示
一个MQ客户端只会创建一个消息拉取线程向Broker拉取消息,并且同一时间只会拉取一个topic中的一个队列,拉取线程一次向Broker拉取一批消息后,会提交到消费者组的线程池,然后向Broker发起下一个拉取请求。
RocketMQ客户端为每一个消费组创建独立的消费线程池,即在并发消费模式下,单个消费组内的并发度为线程池线程个数。
# 7.1.3 消息消费反馈机制
RocketMQ客户端消费一批数据后,需要向Broker反馈消息的消费 进度,Broker会记录消息消费进度,这样在客户端重启或队列重平衡 时会根据其消费进度重新向Broker拉取消息,消息消费进度反馈机制
- 消费线程池在处理完一批消息后,会将消息消费进度存储在本地内存中。
- 客户端会启动一个定时线程,每5s会将存储在本地内存中的所有队列消息消费偏移量提交到Broker中。
- Broker收到的消息消费进度会存储在内存中,每隔5s将消息消费偏移量持久化到磁盘文件中。
- 在客户端向Broker拉取消息时也会将该队列的消息消费偏移量提交到Broker。
再来思考一个问题,线程池如何提交消费偏移量?
# 7.2 消息消费者初探
# 7.3 消费者启动流程
第一步:构建主题订阅信息SubscriptionData并加入 RebalanceImpl的订阅消息中,如代码清单5-2所示。订阅关系来源主 要有两个。
- 通过调用DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression)方法获取。
- 订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而 不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动时会 自动订阅该主题,参与该主题的消息队列负载。
第二步:初始化MQClientInstance、RebalanceImple(消息重新 负载实现类)等.
第三步:初始化消息进度,如果消息消费模式采用集群模式,那么消息进度存储到Broker中,如果采用广播模式,那么消息消费进度存储在Broker中。
第四步:如果是顺序消费,创建消费端消费线程服务。ConsumeMessageService主要负责消息消费,在内部维护一个线程池,
第五步:向MQlientInstance注册消费者并启动,JVM中的所有消费者、生产者持有同一个MQIientInstance。且只会启动一次。
# 7.4 消息拉取(重看)
# 7.4.1 并发消息拉取基本流程
消息拉取分为3个主要步骤。
- 拉取客户端消息拉取请求并封装
- 消息服务器查找消息并返回
- 消息拉取客户端处理返回的信息
# 7.8 消息过滤机制
RocketMQ支持表达式过滤和类过滤俩种消息过滤机制。表达式模式分为TAG与SQL92模式。SQL92模式以消息属性过滤上下文,实现SQL条件过滤表达式,而TAG模式就是简单为消息定义标签。根据消息属性tag进行匹配。
RocketMQ消 息过滤方式不同于其他消息中间件,是在订阅时进行过滤
消息发送者在消息发送时如果设置了消息的标志属性,便会存储在消息属性中,将其从CommitLog文件转发到消息消费队列中,消息消费队列会用8个字节存储消息标志的哈希码。在Broker端拉取消息时,遍历ConsumeQueue,只对比消息标志的 哈希码,如果匹配则返回,否则忽略该消息
消费端在收到消息后, 同样需要先对消息进行过滤,只是此时比较的是消息标志的值而不是 哈希码。
# 7.4 顺序消息
RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列 中的消息按顺序消费,如果需要做到全局顺序消费,则可以将主题配 置成一个队列,适用于数据库BinLog等严格要求顺序消息消费的场 景。并发消息消费包含4个步骤:消息队列负载、消息拉取、消息消 费、消息消费进度存储。
# 7.4.1 消息队列负载
RocketMQ首先需要通过RabalanceService线程实现消息队列的负载,集群模式下同一个消费组内的消费者共同承担其订阅主题下消息队列的消费,同一个消息消费队列在同一个时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列。
经过消息队列重新负载(分配)后,分配到新的消息队列时,首 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。 顺序消息消费与并发消息消费的一个关键区别是,顺序消息在创 建消息队列拉取任务时,需要在Broker服务器锁定该消息队列。
# 7.4.2 消息拉取
RocketMQ消息拉取由PullMessageService线程负责,根据消息拉 取任务循环拉取消息,
如果消息处理队列未被锁定,则延迟3s后再将PullRequest对象放 入拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取 偏移量,然后向消息服务端拉取消息。
# 7.4.3 消息队列锁实现
顺序消息消费的各个环节基本都是围绕消息消费队列与消息处理队列展开的。拉取消息消费进度,要判断ProcessQueue的locked是否为true,为true的前提条件是消息消费者向Broker端发送锁定消息队列的请求并返回加锁成功。
ConcurrentMap mqLockTable:锁容器,以消息消费组分组,每个 消息队列对应一个锁对象,表示当前该消息队列被消费组中哪个消费 者所持有。
# 8 RocketMQ的ACL
# 8.1 什么是ACL
ACL(访问控制列表)与我们在应用系统中接触到的用户、资源、权限、角色类似,在RocketMQ中对应如下对象
- 用户:用户是访问控制的基本要素。在RocketMQ ACL实现中必 然也会引入用户的概念,即支持用户名、密码。
- 资源:需要保护的对象。在RocketMQ中,消息发送涉及的 topic和消息消费涉及的消费组等都应该进行保护,故可以抽象成资 源。
- 权限:可以简单地将权限理解为可以对资源进行操作,在 RocketMQ ACL中主要包含topic的发送权限、对topic的订阅权限等。
- 角色:RocketMQ中只定义了两种角色,管理员和非管理员。
# 9. RocketMQ主从同步机制
# 9.1 RocketMQ主从同步原理
1)HAService:RocketMQ主从同步核心实现类。 2)HAService$AcceptSocketService:高可用主服务器监听客户 端连接实现类。 3)HAService$GroupTransferService:主从同步通知实现类。 4)HAService$HAClient:HA客户端实现类。 5)HAConnection:HA主服务器高可用连接对象的封装,也是 Broker从服务器的网络读写实现类。 6)HAConnection$WriteSocketServicce:高可用主节点网络写实 现类。 7)HAConnection$ReadSocketService:高可用主节点网络读实现 类。
# 10. RocketMQ消息轨迹
消息轨迹简单来说就是日志,把消息的生产、存储、消费等所有的访问和操作日志都详细记录下来。
# 10.2 消息轨迹设计原理
RocketMQ消息轨迹主要用于跟踪消息发送、消息消费的轨迹,详细记录消息各个处理环节的日志。
从设计上至少需要解决以下三个问题:
- 消息轨迹数据格式
- 采集轨迹数据
- 存储消息轨迹数据
如何采集轨迹数据
消息中间件的两大核心主题是消息发送和消息消费,核心载体是 消息。消息轨迹(消息的流转)主要是记录消息何时发送到哪台 Broker、发送耗时是多少、在什么时候被哪个消费者消费等信息。
通常为了避免消息轨迹的数据与正常的业务数据混在一起,官方建议 在Broker集群中新增一台机器,只在这台机器上开启消息轨迹跟踪, 这样该集群内的消息轨迹数据只会发送到这一台Broker服务器上,并 不会增加集群内原先业务Broker的负载压力。
# 11.RocketMQ主从切换
# 11.1 主从切换引入目的
因为主从切换机制能极大地提高集群的可用性,所以RocketMQ 4.5.0开始正式支持这一特性。
Raft协议主要包括俩个部分:Leader选举和日志复杂
# 11.2 Raft协议简介
在复制组内实现主从切换的一个基本前提是复制组内各个节点的 数据必须一致,否则主从切换后将会造成数据丢失。Raft协议是目前 分布式领域一个非常重要的一致性协议,RocketMQ的主从切换机制也 是介于Raft协议实现的。
Raft协议主要包含两个部分:Leader选举和 日志复制。
# 11.2.1 Leader选举
Raft协议的核心思想是在一个复制组内选举一个Leader节点,后续统一由Leader节点处理客户端的读写请求,从节点只是从Leader节点复制数据,即一个复制组在接受客户端的读写请求之前,要先从复制组中选择一个Leader节点,这个过程称为Leader选举。
# 11.2.2 日志复制
客户端向DLedger集群发起一个写数据请求,Leader节点 收到写请求后先将数据存入Leader节点,然后将数据广播给它所有的 从节点。从节点收到Leader节点的数据推送后对数据进行存储,然后 向主节点汇报存储的结果。Leader节点会对该日志的存储结果进行仲 裁,如果超过集群数量的一半都成功存储了该数据,则向客户端返回 写入成功,否则向客户端返回写入失败。
# Rocket面试题
# RocketMQ性能为什么比较高
- Netty 高校的NIO框架
- RocketMQ大量使用多线程、异步
- 采用零拷贝技术优化(MMAP) 性能提高50%
- 采用文件存储,顺序读写
- 锁优化(CAS机制无锁化)
- 存储设计:读写分离
# 如何设计消息队列
存储怎么设计:高可用-磁盘存储、顺序读写、零拷贝技术
可伸缩:分布式
消息的丢失
网络框架
# Raft协议
Raft协议是一种分布式共识算法,用于保证分布式系统中的一致性。它通过选举一个领导者来管理多个节点,实现数据在不同节点间的复制与同步。Raft协议分为三个部分:领导者选举、日志复制和安全性。
在Raft协议中,每个节点可以处于三种状态之一:领导者、跟随者或候选者。初始时所有节点都是跟随者状态。当某个节点发起选举后,其他节点将成为候选者,并向其他节点发送投票请求。如果候选者得到大多数节点的支持,则成为新的领导者。
领导者负责处理客户端请求,将操作转换成日志并复制到其他节点。其它节点将拒绝任何来自非领导者的日志请求。领导者同时还需要定期进行心跳检查以保持其领导地位。
通过这种方式,Raft协议可以确保分布式系统中的所有节点达成一致状态,从而提高系统可靠性和容错性。
# RocketMQ中的路由注册
RokcetMQ中如何进行路由剔除:使用的是路由扫描,间隔时间超过120秒的Broker会被剔除
# RocketMQ的总体架构
NRC、NRS
# RocketMQ事务消息交互流程
RocketMQ事务回查机制
# RokcetMQ如何实现顺序消息
RocketMQ提供了一种称为“顺序消息”的消息投递方式,即保证相同的消息按照发送的顺序被消费,这对于一些需要保证业务逻辑正确性的场景非常重要。
RocketMQ实现顺序消息的主要思路是,将同一个业务流程的消息发送到同一个消息队列中,然后在消费端按照顺序消费这些消息,这样就能保证同一个业务流程的消息被按照发送的顺序被消费了。
具体实现可以分为以下几个步骤:
- 在Producer端将同一个业务流程的消息发送到同一个Topic中的同一个队列中。可以使用MessageQueueSelector或者SendCallback等方式进行消息的发送和选择。
- 在Consumer端,保证同一个业务流程的消息被分配到同一个消费者实例上,这可以通过设置MessageListenerOrderly来实现。
- 消费者实例在消费消息时,按照顺序依次消费这些消息。
需要注意的是,这种实现方式仅仅能保证同一个业务流程的消息被顺序消费,对于不同业务流程之间的消息顺序并不能保证。另外,由于网络波动等原因,消息消费的顺序并不能完全保证,只能尽可能地减小消息消费的乱序情况。