【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析

前言介绍


RocketMQ 是个消息服务器,也是个网络服务器。接下来我们将从网络 IO 模型,线程模型,看看 RocketMQ 是如何设计的。




IO 模型


RocketMQ 使用了 Netty 作为网络通信框架,自然而然使用了 Reactor 模型,或者说 Select 模型、Epoll 模型。即一个线程管理 N 个 Socket 的模式,此模式可管理海量连接,基本是所有网络服务器的首选。



配置相关的RocketMQ的配置


RocketMQ的Boss线程数为 1, Worker 线程数为 CPU * 2.

在说线程模型之前,先看看 RocketMQ 如何设计 Server 接口的。

image.png

RemotingService作为顶层接口,定义了 启动和关闭,另外还有注册 RPC 钩子,职责简单。他的两个子接口 RemotingServer 和 RemotingClient 各自增加了自己的抽象接口。


  • Server 专属的 localListenPort 和 getProcessorPair
  • Client 专属的 getNameServerAddressList 等


注意:两者都有 invokeSync 方法,但,参数不同,这也是因为他们自身的角色不同所影响的。


至于NettyRemotingAbstract 抽象类,这只是个简单的”抽取重复代码”的“简单操作”。

再下面,就是具体实现类,每个类,都有内部类,都是Netty各种Handler 的实现:

image.png

  • NettyConnectManageHandler 负责处理 注册,连接,异常等事件,继承自 ChannelDuplexHandler。
  • NettyServerHandler 则是关键的业务处理类,处理真正的 Msg,继承自 SimpleChannelInboundHandler。
  • HandshakeHandler 负责处理握手程序,这里就不解释了。


以上 3 个是 Server 端的 Handler,都是NettyRemotingServer的内部类。



线程模型


NettyServerHandler作为处理业务的关键类,每个 worker 线程都有自己的单独实例,但该类只是做个包装或者桥接而已,作用不大, NettyRemotingServer才是关键。


当 Request 进入到 Server 中,MQ 会根据 请求类型 code 找到对应的处理器,MQ 有多种处理器,如下:

image.png


他们都继承自 NettyRequestProcessor 接口:

image.png

此接口只有 2 个方法,处理请求和拒绝请求,处理请求的参数是 Netty 的 context 和自身的RemotingCommand 对象,这是个大对象:

image.png


RemotingCommand 的成员变量,这里说下 flag 的作用,其他就不说了。


flag表示这次请求是什么类型。


  • 倒数第一位,0 表示请求,1 表示返回。
  • 倒数第二位,1 表示 oneWay。



NettyRequestProcessor


刚刚提到 NettyRequestProcessor ,这是个处理器,在 MQ 中,每个 NettyRequestProcessor 都绑定了一个线程池,在 MQ 的抽象里,有个 Pair 对象,如下:

public class Pair<T1, T2> {
    private T1 object1;
    private T2 object2;
    public Pair(T1 object1, T2 object2) { this.object1 = object1; 
                                         this.object2 = object2;}
    public T1 getObject1() { return object1;}
    public void setObject1(T1 object1) {this.object1 = object1; }
    public T2 getObject2() { return object2;}
    public void setObject2(T2 object2) {  this.object2 = object2;}
}
复制代码

同时,还有个 Hash 表,用 code 映射了 Pair。如此,就实现了:通过请求 code 找到“线程池和处理这种请求的处理器”,然后,提交一个任务到该线程池,任务中,会调用该处理器的 processRequest 方法,或 rejectRequest 方法。

image.png

上图中,为处理请求的关键步骤。执行钩子就不说了,我们知道,设计代码时,关键步骤都加钩子,便于扩展和以后加代码。


其中,会调用 processRequest 方法,执行具体业务,并得到返回值。然后使用 netty 的 ctx 对象,将返回值直接写回 Socket。


如果发生错误了,也将错误构造成消息,写回客户端。


注意,这里一直有个操作 就是 response.setOpaque(opaque) ,就是设置请求 ID,这是 IO 多路复用的关键。


Netty 每次请求,都会调用 NettyRemotingServer 的 processRequestCommand 方法。


而 NettyRemotingServer 保存了请求 code 和 Pair<处理器,线程池> 的hash 映射表。


每次请求,根据 code 找到线程池,生成一个新任务,提交到线程池,任务里,会执行“处理器” 的processRequest 方法得到返回值,最后写回客户端。


MQ 为每种类型的任务,使用了不同的线程池,即线程池隔离。同时,也根据每种不同的任务类型,设置了不同的线程池参数。




参数介绍


  • Send 发送消息任务,线程池大小是1。
  • pull 拉取消息任务,线程池大小是 16 + CPU*2
  • query 查询任务,线程池大小是 8 + CPU*2;


当然还有其他的,这里就不枚举了,注意:大部分线程池都是多线程,只有 send 任务默认是单线程。


send 操作是个写操作,最后是要上锁的,虽然锁的粒度已经足够小,但仍然是有锁的。如果是有锁的,多线程的是不划算的。这也是 RocketMQ 的设计决定只写一个 CommitLog。


能像Kafka一样,同时写多个文件,是不是就可以利用多线程了呢?


当然,这里不是说多线程一定好,只是表达另外一种思路。如果单线程就能触发 MQ 瓶颈,多线程也没啥意义。


总结


image.png


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
16天前
|
消息中间件 监控 API
在Python中如何实现微服务架构,及相关的服务间通信方案?
Python微服务架构涉及服务划分、注册发现、通信协议选择(如HTTP、gRPC、消息队列)及服务间通信实现。每个服务应自治,有独立数据库和部署流程,并需考虑容错(如分布式事务、重试、熔断)和监控日志。API网关用于请求管理和路由。实际操作需根据需求和技术栈调整,并关注服务拆分和数据一致性。
42 5
|
8天前
|
UED
服务架构中的数据驱动设计
【5月更文挑战第13天】数据驱动设计是依据用户数据进行网页设计的方法,旨在通过测试了解用户需求并优化体验,从而增加流量和转化率。设计师应避免主观感受影响设计,因个人偏好可能与用户需求不符。数据驱动设计能减少偏见,提高转化率和销售额,是一个迭代过程,不断实验和优化。虽然有些人担忧可能限制创造力,但其实它仍需要创新和妥协。随着业务、用户和技术变化,数据驱动设计提供持续改进的解决方案。
37 0
服务架构中的数据驱动设计
|
16天前
|
消息中间件 存储 Apache
MQ产品使用合集之有RocketMQ arm架构的镜像吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
207 1
|
16天前
|
JSON JavaScript 前端开发
KOI 后台新的架构下,webshop如何消费后台服务 - websocket 初始化
KOI 后台新的架构下,webshop如何消费后台服务 - websocket 初始化
14 0
|
16天前
|
缓存 微服务
01.【微服务架构】服务注册与发现:AP和CP,你选哪个? -- 客户端容错
【5月更文挑战第12天】客户端容错机制确保在服务端或注册中心故障时仍能正确发送请求。当服务端崩溃,由于延迟,客户端一段时间内仍会尝试发送请求。客户端应实施 failover 策略,即检测到调用失败后,切换到其他节点重试,并将故障节点从列表移除。延时通常等于服务端与注册中心心跳间隔加通知时间。若网络问题导致客户端无法访问服务端,客户端应发送心跳以检测服务端状态,成功则恢复,连续失败则视为崩溃。若客户端无法连接注册中心,它应使用本地缓存并考虑退出。
19 1
01.【微服务架构】服务注册与发现:AP和CP,你选哪个? -- 客户端容错
|
16天前
|
Kubernetes Cloud Native 持续交付
探索云原生架构的未来:如何优化资源管理和服务部署
【5月更文挑战第6天】 随着云计算的快速发展,云原生技术已成为企业数字化转型的关键驱动力。此篇文章深入探讨了云原生架构的核心组件及其在资源管理和服务部署方面的优化策略。通过分析容器化、微服务及自动化管理的实践案例,本文旨在为读者提供一套系统的方法论,以利用云原生技术实现更高效、灵活且可靠的IT基础设施。
36 2
|
16天前
|
微服务
01.【微服务架构】服务注册与发现:AP和CP,你选哪个?-- 高可用性
【5月更文挑战第4天】注册中心通过心跳检测服务端状态,当心跳失败时预判服务端崩溃并通知客户端停止使用。心跳机制应对网络不稳定,需平衡重试次数与间隔,避免误判和延迟。即使如此,从服务端宕机到客户端获知仍存在时间差,因此需要客户端具备容错能力。
30 0
|
16天前
|
微服务
01.【微服务架构】服务注册与发现:AP和CP,你选哪个?-- 服务端崩溃检测
【5月更文挑战第3天】保证服务注册与发现的高可用需关注三个方面:服务端崩溃检测、客户端容错和注册中心选型。服务端崩溃时,注册中心通过心跳检测来识别,若心跳中断,立即通知客户端服务不可用,同时持续尝试恢复心跳。若一段时间后仍无法连接,则断定服务端彻底崩溃。这种方法兼顾及时故障通知和防止误判。
36 8
|
16天前
|
微服务 中间件 Nacos
01.【微服务架构】服务注册与发现:AP和CP,你选哪个?-- 面试准备+基本模型
【5月更文挑战第2天】面试准备应涵盖公司所使用的注册中心类型及其优缺点,了解其集群规模、QPS和机器性能。准备故障排查及优化案例。若公司未采用微服务,可熟悉ZooKeeper、Nacos或etcd的基本特性以讨论注册中心概念。面试时,可将话题引导至服务注册与发现,如被问及特定中间件,阐述为何选择它并讨论优缺点。当涉及微服务高可用性时,可强调服务注册与发现的作用。基础模型部分,需解释服务上线和下线流程,提及注册数据和分组功能,并举例说明。最后,简述服务注册与发现的高可用挑战。
35 8
|
16天前
|
缓存 微服务
01.【微服务架构】服务注册与发现:AP和CP,你选哪个?-- 服务注册与发现模型
【5月更文挑战第1天】本文探讨了服务注册与发现的关键作用,在微服务架构中,这一概念常出现在面试中。文章深入讲解基础模型,包括服务端注册、心跳维持、客户端缓存及服务端下线流程,并强调了高可用性的重要性,涉及服务端崩溃检测、客户端容错和注册中心选型。通过分析客户端、注册中心和服务端之间的交互,提出如何应对潜在故障的策略,以构建稳定的微服务架构。
37 3
01.【微服务架构】服务注册与发现:AP和CP,你选哪个?-- 服务注册与发现模型

相关产品

  • 云消息队列 MQ
  • http://www.vxiaotou.com