本文 的 原文 地址
原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
本文作者:
- 第一作者 老架构师 肖恩(肖恩 是尼恩团队 高级架构师,负责写此文的第一稿,初稿 )
- 第二作者 老架构师 尼恩 (45岁老架构师, 负责 提升此文的 技术高度,让大家有一种 俯视 技术、俯瞰技术、 技术自由 的感觉)
尼恩说在前面:
在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、shein 希音、shopee、百度、网易的面试资格,遇到很多很重要的面试题:
MQ消息积压、如何监控、如何排查?
你知道如何解决 MQ消息积压 嘛?
你们生产环境 这么高的吞吐量,没有出现过 MQ积压吗? 不可能吧? 怎么解决 的?
前几天 小伙伴面试 美团,遇到了这个问题。但是由于 没有回答好,导致面试挂了。
小伙伴面试完了之后,来求助尼恩。那么,遇到 这个问题,该如何才能回答得很漂亮,才能 让面试官刮目相看、口水直流。
所以,尼恩给大家做一下系统化、体系化的梳理,使得大家内力猛增,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。
尼恩提示:回答问题的时,首先 从一个 现场故事讲起。
现场事故:凌晨2点 报警群突然炸了
凌晨2点, 凌晨2:15,监控系统触发MQ积压告警,核心业务消息队列积压量在15分钟内从50万激增至1000万条,导致订单支付、物流推送等核心业务链路出现严重延迟。
报警群突然炸了——各种催命电话, 把我从被窝里 催起来。
我顶着黑眼圈打开MQ 监控一看,发现上游系统异常推送800万条营销消息 。
- 直接诱因:夜间批量促销活动未做流量评估,上游系统异常推送800万条营销消息
- 架构缺陷:消费者采用单线程串行消费模式,监控阈值设置过高(500万才触发告警)
然后,加班熬夜一通宵 的时间,彻底把问题解决。并且,把我的绝招整理成了下面的方案,提供给其他同学学习使用。
方案分两个部分:
- 紧急的 止血救命包 (临时方案)
- 长期的 架构治根 ( 长期方案)
紧急止血 + 架构治根 ,一共是5个步骤
第1步: 定位原因:
第2步: 紧急止血包-极速扩容:
临时扩容Consumer实例和开启批量消费。
第3步: 弃卒保帅-非核心业务降级:
暂停非关键业务的消费者和降低非关键业务消费者线程数。
第4步: 并行爆破-上重武器:
消息转储 和 消费者更大规模扩容。
第5步:架构治根。 长治久安-上牛逼的架构方案:
高吞吐架构升级和高并发压的应急预案。
第1步:定位原因
- 生产侧问题(较少见,10%)
- Broker侧问题(较少见,10%)
- 消费侧问题(最可能,80%)
第2步:紧急止血包(临时消费者扩容)
第3步. 弃卒保帅:消费者降级
- 暂停非关键业务的消费者
- 降低非关键业务消费者线程数。
第4步. 并行爆破:上重武器
消息转储 和 消费者更大规模扩容。
第5步. 架构治根:架构升级, 长治久安
接下来,尼恩给大家说详细方案:
第1步:定位消息积压原因
在消息处理流程中,若客户端的消费速度跟不上服务端的发送速度,未处理的消息会不断累积,这部分消息就是堆积消息。
消息堆积会直接导致消费延迟,想要高效排查和解决这类问题,首先定位原因。
定位消息积压原因
遇到消息积压时,很多人第一反应是“扩容消费者”,但在操作前必须先明确:到底是什么拖慢了消费速度?
RocketMQ的消费链路就像一条流水线,任何环节“堵车”都会引发积压,我们得先给这条流水线做个全面“检查”。
MQ消息积压的核心本质:生产速率>消费速率,导致消息在Broker队列中堆积。
全链路分析如下:
生产侧问题(较少见,10%)
- 业务高峰(如秒杀、大促)、补偿机制重发、生产端线程池失控等导致的瞬时流量冲击。
Broker侧问题(较少见,10%)
- 磁盘IO瓶颈(PageCache刷盘慢);
- 主从同步延迟;
- 网络分区或资源限制。
消费侧问题(最可能,80%)
(1)性能瓶颈:
- Consumer陷入死循环,导致卡死;
- 业务逻辑复杂(如慢SQL、外部API调用、高耗时计算);
- 单条消息处理时间过长(超过100ms需警惕)。
(2)资源不足:
- 消费者实例数量不足(未随流量动态扩容);
- 消费者宕机或线程阻塞(如GC停顿、死锁)。
(3)配置缺陷:
- 顺序消费中单条消息卡住会阻塞整个队列(顺序消息会持续重试,普通消息仅重试16次);
- 广播模式下重复处理导致效率低下。
(4)重试风暴:
消息积压本质是“生产速度>消费速度+Broker转发能力”。
由于Broker通常是高可用集群,生产侧若无人工故障也较稳定,因此排查时应优先考虑消费侧问题。
大致的排查步骤如下:
排查Consumer是否处于“假死”状态
打开RocketMQ Dashboard(运维必备工具),查看Consumer分组的“在线客户端”列表。若某台服务器的Consumer长时间未上报心跳(LastHeartbeatTime超过2分钟),大概率是“消费者假死”。
这种情况多因消费者线程被Full GC卡住或代码中存在死循环。例如曾遇到某台服务器因循环中频繁打印日志导致CPU占用100%,Consumer线程直接卡死,积压量持续增加。
注意:需为Consumer配置JVM监控,重点关注GC频率和耗时。比如假死机器的Young GC耗时超500ms,老年代频繁Full GC,就会直接影响Consumer正常工作。
检查队列负载是否均衡
RocketMQ的Consumer采用“队列均分”策略,每个Consumer分配多个Message Queue(MQ)。若某台Consumer分配100个MQ,另一台仅分配10个,会导致“忙闲不均”。
通过Dashboard可查看每个Consumer实例的“已分配队列数”。比如三台新扩容服务器因网络配置问题未连接NameServer,导致老服务器承担80%队列,消费能力被压垮。
实操建议:若队列分配不均,可先重启Consumer实例触发重新负载均衡;若问题持续,检查Consumer分组配置,确保consumeFromWhere和messageModel设置正确(默认CLUSTERING模式会自动均衡)。
消费端负载均衡策略详细情况,参考最后一个小节
检查消费线程是否“效率低下”
RocketMQ Consumer默认消费线程数为20(由consumeThreadMin和consumeThreadMax控制)。若业务逻辑复杂(如涉及数据库查询、接口调用),20个线程可能不足,导致大量任务排队。
比如日志中发现线程池任务堆积量超1000,而实际工作线程仅10个——因为初始化时误将consumeThreadMin和Max均设为10,无法应对流量激增。
重点:线程数并非越多越好,需结合CPU核心数调整。IO密集型任务可设为CPU核心数的5-10倍(如50);CPU密集型任务超过32通常无意义,反而会因上下文切换降低效率。
节点线程数计算模型:
单节点并发度需合理设置,过大易增加线程切换开销。理想环境下最优线程数计算模型:
- 单机vCPU核数为C;
- 忽略线程切换耗时,I/O操作不消耗CPU;
- 线程有足够消息处理,内存充足;
- 逻辑中CPU计算耗时为T1,外部I/O操作为T2。
则单个线程的TPS为1/(T1+T2),若CPU使用率达100%,单机最大线程数为C*(T1+T2)/T1。
第2步:紧急止血包(临时消费者扩容)
明确原因后进入“急救阶段”,需先让消费速度追上生产速度,再逐步消化历史积压。
第一招:临时扩容Consumer
这是最直接的方法,相当于增加高速公路车道。RocketMQ的Consumer无状态,理论上可无限扩容,但需注意两点:
扩容数量不超过MQ总数
每个MQ同一时间仅能被一个Consumer消费。
例如集群有100个MQ,最多可扩容至100个Consumer实例(每个实例分配1个MQ);
若集群有200个MQ且当前仅10个Consumer,理论上可先扩容至50个实例,充分利用队列资源。
第二招:开启批量消费,提高单次处理量
RocketMQ支持批量消费,默认每次拉取1条消息(参数consumeMessageBatchMaxSize默认值为1)。
若业务允许,可改为一次拉取10-32条,减少网络交互,提升吞吐量。
比如将该参数改为16,配合扩容后消费速度从500条/秒提升至8000条/秒——相当于从每次搬1箱货变为搬16箱,效率显著提升。但需注意:
保持幂等性
批量处理可能出现重复消费(如处理到第10条时消费者挂了,重启后16条消息会重新消费),因此业务代码必须支持幂等(如用唯一ID去重)。比如因未做幂等导致数据库出现重复订单,后面还得脚本去重。
避免参数过大
超过32后吞吐量提升不明显,反而增加内存压力。曾尝试设为100,导致Consumer内存使用率超80%,险些触发OOM,最终确定16-32为最佳范围。
第3步. 弃卒保帅:消费者降级+ 暂停Producer或限流
消费者降级
- 暂停非关键业务的消费者
- 降低非关键业务消费者线程数。
暂停Producer或限流,控制消息源头
若积压量极大(比如千万级以上)且消费速度短期内无法追上,可暂时让Producer停止发消息或降低发送频率。
注意:暂停Producer前必须与业务方沟通。
例如电商大促期间,暂停支付回调消息会影响商家收款,比如与前端协商在用户支付成功页增加“稍后刷新”提示,同时将Producer从2000 TPS限流至500 TPS,为消费者争取缓冲时间。
注意:
暂停后需监控Consumer的“堆积量”是否下降(理想状态为每分钟下降10-20万条)。
若未变化,可能是消费者重试逻辑导致消息反复投递(如消息处理失败后进入重试队列,积压量“假死”),此时需检查maxReconsumeTimes参数(默认16次,超过后进入死信队列)。
第4步. 并行爆破:上重武器
消息转储 和 消费者更大规模扩容。
解决第一步 临时扩容场景下的 MQ 分区总数不足的解决方案
若前期MQ数量不足(如仅4个MQ且已分配4个Consumer),第一步的临时扩容Consumer 意义不大,可按以下步骤处理:
1、 临时转储队列 : 创建原队列数10倍(或N倍)的新Topic , 也就是 临时转储队列;
2、 消息转储 : 开发临时转发程序,将积压消息均匀分发至新Topic的队列中;
3、 消费者更大规模扩容: 对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
4、消费完成后恢复原有架构,避免资源浪费。
对应扩容Consumer(10倍),每个Consumer消费一个临时队列,同时扩容依赖的业务服务(如缓存、数据库);
实操步骤:
1、 临时创建新Consumer分组(如加后缀-tmp),避免与原有消费者竞争资源;
2、 启动时指定--consumerThreadMin 50 --consumerThreadMax 50(临时调高线程数);
3、 观察Dashboard的“消费速度”,理想状态下每台新服务器分配4-5个MQ,消费速度可提升3-5倍。
第5步. 架构治根:架构升级,彻底 根治
5.1: 高吞吐架构升级:从 “被动应对” 到 “主动防御”
高吞吐架构的核心目标是:让生产速率≤消费速率 + Broker 承载能力,从根源上减少积压风险。需从生产端、消费端、Broker 端三个维度系统性优化,结合业务场景(如秒杀、大促)设计针对性方案。
(1)生产端:控制 “消息源头” 的速率与质量
生产端是消息的 “起点”,需通过限流、瘦身、异步化等手段,避免瞬时流量冲击 MQ。
生产端 动态限流:给生产端装 “刹车”
核心措施:基于 MQ Broker 的 “消息堆积量” 动态调整生产速率。
实现方式:
在 Producer 端集成 “积压量监测接口”(如调用 RocketMQ Dashboard 的/topic/stats接口),当某 Topic 积压量超过 50 万条时,自动触发限流(通过令牌桶算法将 TPS 从 2000 降至 500)。
业务适配:
秒杀场景下,结合前端限流(如按钮置灰、排队提示)和后端限流(Redis 计数器 + Lua 脚本),确保生产速率不超过消费端最大处理能力的 80%(预留 20% 缓冲)。
生产端 消息 “瘦身”:减少无效数据传输
核心问题:大消息(>1MB)会导致 Broker 存储效率下降、消费端处理耗时增加(如解析大 JSON 耗时 100ms+)。
优化措施:
- 消息体只保留 “核心字段”(如订单 ID、用户 ID、金额),非核心字段(如用户地址、商品详情)通过 “消息 + 数据库” 组合获取(消费端拿到消息后,再查 DB 补充信息);
- 大字段压缩:使用 Protobuf 替代 JSON(压缩率提升 50%+),或对超过 500KB 的消息进行 GZIP 压缩;
- 禁止 “日志型消息”:如非必要,不将接口调用日志、调试信息写入 MQ(改用 ELK 日志系统)。
(2)消费端:提升 “处理效率” 与 “容错能力”
消费端是消息处理的 “主力”.
消费端 需通过并行化、轻量化、隔离化设计,将单条消息处理耗时压缩至 50ms 以内(非复杂业务)。
队列拆分:
按 “业务类型” 或 “用户 ID 哈希” 拆分 Topic 队列,避免单队列阻塞。
示例:原 “订单消息” Topic(100 个队列)拆分为 “支付订单”(50 个队列)、“取消订单”(30 个队列)、“退款订单”(20 个队列),分别对应独立消费者组,避免某类消息(如退款)处理慢阻塞全量;
线程池优化:
核心参数优化:
IO 密集型业务(如调用外部 API、查 DB)→ 线程数 = CPU 核心数 ×5(如 8 核 CPU→40 线程);
CPU 密集型业务(如数据计算)→ 线程数 = CPU 核心数 ×2;
线程池隔离:
使用 Hystrix 或 Resilience4j 为不同消息类型分配独立线程池(如支付消息用payThreadPool,物流消息用logisticsThreadPool),避免某类消息线程池满导致全局阻塞。
业务逻辑轻量化:砍掉 “慢操作”
慢 SQL 优化:消费端涉及的 DB 操作必须加索引,禁止select *、复杂 JOIN(耗时>50ms 的 SQL 需拆分或异步化);
外部依赖缓存:调用第三方接口(如支付回调、物流查询)时,增加本地缓存(Caffeine,过期时间 5 分钟)+ 分布式缓存(Redis),减少远程调用耗时(从 200ms→10ms);
异步处理非核心步骤:如订单消息消费时,“扣减库存”(核心)同步处理,“发送短信通知”(非核心)丢入本地线程池异步执行(失败后不影响主流程)。
批量消费 + 幂等设计:提升吞吐量 + 防重复
批量消费参数固化:consumeMessageBatchMaxSize固定为 16-32(经实测,此范围吞吐量提升最明显,且内存可控);
幂等实现:
- 消息层面:为每条消息生成唯一 ID(如 UUID),消费端首次处理时写入 “消息处理表”(ID + 状态),重复消息直接跳过;
- 业务层面:订单支付消息通过 “订单 ID + 支付状态” 去重(如已支付的订单,再次收到支付消息时直接返回成功)。
(3)Broker 端:强化 “承载能力” 与 “稳定性”
Broker 是消息存储与转发的核心,需通过硬件升级、集群扩容、参数优化提升极限承载能力(目标:单 Broker 支持 10 万 TPS+,集群支持百万 TPS+)。
硬件与存储优化
- 磁盘:使用 SSD(随机读写速度是机械硬盘的 10 倍 +),且单 Broker 磁盘容量≥1TB(避免频繁清理旧消息);
- 内存:为 Broker 配置足够大的 PageCache(如 16 核 32G 机器,分配 16G 作为 PageCache),减少磁盘 IO 压力(消息先写入 PageCache,再异步刷盘);
- 网络:Broker 节点间使用万兆网卡,避免跨机房部署(主从节点同机房,延迟控制在 1ms 内)。
集群扩容与负载均衡
- 集群规模:按 “生产 TPS×2” 配置 Broker 节点(如预估生产端 50 万 TPS,集群部署 10 个 Broker 节点,单节点承载 5 万 TPS);
- 队列均衡:每个 Topic 的队列数 = Broker 节点数 ×8(如 10 个 Broker→80 个队列),确保队列均匀分布在各 Broker(避免某台 Broker 负载过高);
- 主从架构:每个 Broker 配置 1 个从节点,主节点故障时自动切换(RocketMQ 支持主从自动切换,切换时间<30 秒)。
刷盘与清理策略优化
- 刷盘策略:高吞吐场景用ASYNC_FLUSH(异步刷盘,写入 PageCache 即返回成功,由后台线程定时刷盘),牺牲部分一致性换性能;
- 消息清理:设置合理的fileReservedTime(消息保留时间),非核心消息保留 24 小时,核心消息保留 7 天(避免旧消息占用磁盘空间)。
5.2 : 应急预案:从 “无序应对” 到 积压的 “标准化处理流程”
由于平台 篇幅限制, 此处省略1000字+
剩下的内容,请参加原文地址原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |