大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务- //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
- public final class NioEventLoop extends SingleThreadEventLoop {
- private volatile int ioRatio = 50;
- ...
- @Override
- protected void run() {
- for (;;) {
- ...
- //1.调用select()方法执行一次事件轮询
- select(wakenUp.getAndSet(false));
- if (wakenUp.get()) {
- selector.wakeup();
- }
- ...
- //2.处理产生IO事件的Channel
- processSelectedKeys();
- ...
- //3.执行外部线程放入TaskQueue的任务
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- }
- }
- private void select(boolean oldWakenUp) throws IOException {
- for(;;) {
- //1.定时任务截止时间快到了,中断本次轮询
- //2.轮询过程中发现有任务加入,中断本次轮询
- //3.阻塞式select操作: selector.select(timeoutMills)
- //4.避免JDK空轮询Bug
- }
- }
- ...
- }
复制代码 (2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。- //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
- public final class NioEventLoop extends SingleThreadEventLoop {
- private volatile int ioRatio = 50;
- ...
- @Override
- protected void run() {
- for (;;) {
- ...
- //1.调用select()方法执行一次事件轮询
- select(wakenUp.getAndSet(false));
- if (wakenUp.get()) {
- selector.wakeup();
- }
- ...
- final int ioRatio = this.ioRatio;
- if (ioRatio == 100) {
- try {
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- runAllTasks();
- }
- } else {
- final long ioStartTime = System.nanoTime();
- try {
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- final long ioTime = System.nanoTime() - ioStartTime;
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- }
- }
- ...
- }
- }
- ...
- }
复制代码 (3)NioEventLoop.run()方法的执行流程- NioEventLoop.run() -> for(;;)
- select() //执行一次事件轮询检查是否有IO事件
- processSelectedKeys() //处理产生IO事件的Channel
- runAllTasks() //处理异步任务队列
- //这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
复制代码
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。- //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
- public final class NioEventLoop extends SingleThreadEventLoop {
- //Boolean that controls determines if a blocked Selector.select should break out of its selection process.
- //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
- private final AtomicBoolean wakenUp = new AtomicBoolean();
- ...
- @Override
- protected void run() {
- for (;;) {
- ...
- //1.调用select()方法执行一次事件轮询
- select(wakenUp.getAndSet(false));
- if (wakenUp.get()) {
- selector.wakeup();
- }
- ...
- }
- }
- ...
- }
复制代码 如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。- //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
- public final class NioEventLoop extends SingleThreadEventLoop {
- Selector selector;
- ...
- private void select(boolean oldWakenUp) throws IOException {
- Selector selector = this.selector;
- for(;;) {
- //1.定时任务截止时间快到了,中断本次轮询
- //2.轮询过程中发现有任务加入,中断本次轮询
- //3.阻塞式select操作: selector.select(timeoutMills)
- //4.避免JDK空轮询Bug
- }
- }
- ...
- }
复制代码 (2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。- //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
- public final class NioEventLoop extends SingleThreadEventLoop {
- Selector selector;
- ...
- private void select(boolean oldWakenUp) throws IOException {
- Selector selector = this.selector;
- int selectCnt = 0;
- long currentTimeNanos = System.nanoTime();//当前时间
- long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
- for(;;) {
- //1.定时任务截止时间快到了,中断本次轮询
- long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
- if (timeoutMillis <= 0) {
- if (selectCnt == 0) {
- selector.selectNow();//非阻塞执行select操作
- selectCnt = 1;
- }
- break;
- }
- ...
- }
- }
- ...
- }
- //Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
- public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
- ...
- protected long delayNanos(long currentTimeNanos) {
- ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
- if (scheduledTask == null) {
- return SCHEDULE_PURGE_INTERVAL;
- }
- return scheduledTask.delayNanos(currentTimeNanos);
- }
- ...
- }
- //Abstract base class for EventExecutors that want to support scheduling.
- public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
- Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
- ...
- final ScheduledFutureTask<?> peekScheduledTask() {
- Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
- if (scheduledTaskQueue == null) {
- return null;
- }
- return scheduledTaskQueue.peek();
- }
- ...
- }
- final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
- ...
- public long delayNanos(long currentTimeNanos) {
- return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
- }
- public long deadlineNanos() {
- return deadlineNanos;
- }
- ...
- }
复制代码
9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
[code]//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //每一个NioEventLoop会有一个MPSC队列 private final Queue taskQueue; ... //Poll all tasks from the task queue and run them via Runnable#run() method. //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos. protected boolean runAllTasks(long timeoutNanos) { //1.转移定时任务到MPSC队列,也就是任务聚合 fetchFromScheduledTaskQueue(); //从普通的任务队列(MPSC队列)里获取任务 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } //2.计算本轮任务执行的截止时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //3.执行任务,通过for循环逐个执行pollTask()取出的任务 for (;;) { //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行 safeExecute(task); //3.2 累加当前已执行的任务数 runTasks ++; //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } //3.4 判断本轮任务是否已经执行完毕 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; } protected Runnable pollTask() { assert inEventLoop(); return pollTaskFrom(taskQueue); } protected final Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } } ...}//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() |