登录
/
注册
首页
论坛
其它
首页
科技
业界
安全
程序
广播
Follow
关于
导读
排行榜
资讯
发帖说明
登录
/
注册
账号
自动登录
找回密码
密码
登录
立即注册
搜索
搜索
关闭
CSDN热搜
程序园
精品问答
技术交流
资源下载
本版
帖子
用户
软件
问答
教程
代码
写记录
写博客
小组
VIP申请
VIP网盘
网盘
联系我们
发帖说明
道具
勋章
任务
淘帖
动态
分享
留言板
导读
设置
我的收藏
退出
腾讯QQ
微信登录
返回列表
首页
›
业界区
›
业界
›
Netty源码—3.Reactor线程模型二
Netty源码—3.Reactor线程模型二
[ 复制链接 ]
但婆
2025-6-3 14:53:31
猛犸象科技工作室:
网站开发,备案域名,渗透,服务器出租,DDOS/CC攻击,TG加粉引流
大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void select(boolean oldWakenUp) throws IOException {
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
复制代码
(2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
...
}
}
...
}
复制代码
(3)NioEventLoop.run()方法的执行流程
NioEventLoop.run() -> for(;;)
select() //执行一次事件轮询检查是否有IO事件
processSelectedKeys() //处理产生IO事件的Channel
runAllTasks() //处理异步任务队列
//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
复制代码
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//Boolean that controls determines if a blocked Selector.select should break out of its selection process.
//In our case we use a timeout for the select method and the select method will block for that time unless waken up.
private final AtomicBoolean wakenUp = new AtomicBoolean();
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
}
}
...
}
复制代码
如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
复制代码
(2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();//当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();//非阻塞执行select操作
selectCnt = 1;
}
break;
}
...
}
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
...
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
...
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
...
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
public long deadlineNanos() {
return deadlineNanos;
}
...
}
复制代码
9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
[code]//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //每一个NioEventLoop会有一个MPSC队列 private final Queue taskQueue; ... //Poll all tasks from the task queue and run them via Runnable#run() method. //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos. protected boolean runAllTasks(long timeoutNanos) { //1.转移定时任务到MPSC队列,也就是任务聚合 fetchFromScheduledTaskQueue(); //从普通的任务队列(MPSC队列)里获取任务 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } //2.计算本轮任务执行的截止时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //3.执行任务,通过for循环逐个执行pollTask()取出的任务 for (;;) { //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行 safeExecute(task); //3.2 累加当前已执行的任务数 runTasks ++; //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } //3.4 判断本轮任务是否已经执行完毕 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; } protected Runnable pollTask() { assert inEventLoop(); return pollTaskFrom(taskQueue); } protected final Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } } ...}//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos()
Netty
源码
Reactor
线程
模型
相关帖子
【Java】ThreadLocal源码解析
推荐一种并发线程中资源同步常用方法
UModel统一模型AIOps规模化难题
【Agent】MemOS 源码笔记---(5)---记忆分类
当你不再迷信“最强模型”,系统设计才刚刚开始
Gemini 3.0 Pro 迁移避坑指南:OpenAI API 无缝兼容,多模型协同靠 PoloAPI 更高效
线程池和高并发
Avalonia源码解读:Grid(网格控件)
JSAPIThree 加载单体三维模型学习笔记:SimpleModel 简易加载方式
Flink源码阅读:如何生成StreamGraph
回复
使用道具
举报
提升卡
置顶卡
沉默卡
喧嚣卡
变色卡
千斤顶
照妖镜
相关推荐
业界
【Java】ThreadLocal源码解析
0
149
祉遛吾
2025-12-13
安全
推荐一种并发线程中资源同步常用方法
0
542
汝雨竹
2025-12-15
科技
UModel统一模型AIOps规模化难题
0
944
终秀敏
2025-12-15
业界
【Agent】MemOS 源码笔记---(5)---记忆分类
0
99
能杜孱
2025-12-15
业界
当你不再迷信“最强模型”,系统设计才刚刚开始
0
858
聱嘹
2025-12-16
科技
Gemini 3.0 Pro 迁移避坑指南:OpenAI API 无缝兼容,多模型协同靠 PoloAPI 更高效
0
765
堠秉
2025-12-16
安全
线程池和高并发
0
574
荦绅诵
2025-12-16
业界
Avalonia源码解读:Grid(网格控件)
0
372
思矿戳
2025-12-17
业界
JSAPIThree 加载单体三维模型学习笔记:SimpleModel 简易加载方式
0
556
上官泰
2025-12-17
业界
Flink源码阅读:如何生成StreamGraph
0
336
梁丘艷蕙
2025-12-18
回复
(5)
彭水晶
2025-10-17 01:32:44
回复
使用道具
举报
照妖镜
程序园永久vip申请,500美金$,无限下载程序园所有程序/软件/数据/等
前排留名,哈哈哈
涅牵
2025-11-2 22:45:29
回复
使用道具
举报
照妖镜
程序园永久vip申请,500美金$,无限下载程序园所有程序/软件/数据/等
这个好,看起来很实用
呶募妙
2025-11-25 21:33:05
回复
使用道具
举报
照妖镜
猛犸象科技工作室:
网站开发,备案域名,渗透,服务器出租,DDOS/CC攻击,TG加粉引流
感谢发布原创作品,程序园因你更精彩
注思
2025-11-28 09:18:09
回复
使用道具
举报
照妖镜
猛犸象科技工作室:
网站开发,备案域名,渗透,服务器出租,DDOS/CC攻击,TG加粉引流
新版吗?好像是停更了吧。
钤凑讪
5 天前
回复
使用道具
举报
照妖镜
程序园永久vip申请,500美金$,无限下载程序园所有程序/软件/数据/等
不错,里面软件多更新就更好了
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
|
立即注册
回复
本版积分规则
回帖并转播
回帖后跳转到最后一页
签约作者
程序园优秀签约作者
发帖
但婆
5 天前
关注
0
粉丝关注
14
主题发布
板块介绍填写区域,请于后台编辑
财富榜{圆}
3934307807
991124
anyue1937
9994893
kk14977
6845358
4
xiangqian
638210
5
韶又彤
9997
6
宋子
9982
7
闰咄阅
9993
8
刎唇
9993
9
俞瑛瑶
9998
10
蓬森莉
9951
查看更多
今日好文热榜
793
Python包管理告别龟速下载:uv工具国内镜像
749
深入理解Linux IPIP隧道:原理、配置与实战
193
HoughLinesP 霍夫变换 C++ opencv 内存报
732
RabbitMQ发布订阅模式同一消费者多个实例如
797
AICube数据集不合法清洗解决方法
601
Iceberg 在hadoop大数据数据湖领域这么火
980
背包DP
436
echarts中appendData的详细讲解
607
C++ 原子操作解析
801
Python - UV 为每个项目创建独立、干净的Py
333
Flink源码阅读:如何生成StreamGraph
701
别再迷信“准确率”了!一文读懂 AI 图像分
106
ROS2概念之DDS
129
具身智能:零基础入门睿尔曼机械臂(四)—
396
Streamlit + LangChain 1.0 简单实现智能问
483
Oracle性能诊断与SQL优化:从9i到19c的技术
919
具身智能:零基础入门睿尔曼机械臂(五)—
222
NGD-SLAM(二)
399
[表单]HTML Learn Data Day 1
164
Oracle等待事件:性能诊断与优化的核心指南