找回密码
 立即注册
首页 业界区 业界 Netty源码—3.Reactor线程模型二

Netty源码—3.Reactor线程模型二

但婆 4 天前
大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
 
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
 
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
 
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
 
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
 
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     private volatile int ioRatio = 50;
  4.     ...
  5.     @Override
  6.     protected void run() {
  7.         for (;;) {
  8.             ...
  9.             //1.调用select()方法执行一次事件轮询
  10.             select(wakenUp.getAndSet(false));
  11.             if (wakenUp.get()) {
  12.                 selector.wakeup();
  13.             }
  14.             ...
  15.             //2.处理产生IO事件的Channel
  16.             processSelectedKeys();
  17.             ...
  18.             //3.执行外部线程放入TaskQueue的任务
  19.             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  20.         }
  21.     }
  22.     private void select(boolean oldWakenUp) throws IOException {
  23.         for(;;) {
  24.             //1.定时任务截止时间快到了,中断本次轮询
  25.             //2.轮询过程中发现有任务加入,中断本次轮询
  26.             //3.阻塞式select操作: selector.select(timeoutMills)
  27.             //4.避免JDK空轮询Bug
  28.         }
  29.     }
  30.     ...
  31. }
复制代码
(2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     private volatile int ioRatio = 50;
  4.     ...
  5.     @Override
  6.     protected void run() {
  7.         for (;;) {
  8.             ...
  9.             //1.调用select()方法执行一次事件轮询
  10.             select(wakenUp.getAndSet(false));
  11.             if (wakenUp.get()) {
  12.                 selector.wakeup();
  13.             }
  14.             ...
  15.             final int ioRatio = this.ioRatio;
  16.             if (ioRatio == 100) {
  17.                 try {
  18.                     processSelectedKeys();
  19.                 } finally {
  20.                     // Ensure we always run tasks.
  21.                     runAllTasks();
  22.                 }
  23.             } else {
  24.                 final long ioStartTime = System.nanoTime();
  25.                 try {
  26.                     processSelectedKeys();
  27.                 } finally {
  28.                     // Ensure we always run tasks.
  29.                     final long ioTime = System.nanoTime() - ioStartTime;
  30.                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  31.                 }
  32.             }
  33.             ...
  34.         }
  35.     }
  36.     ...
  37. }
复制代码
(3)NioEventLoop.run()方法的执行流程
  1. NioEventLoop.run() -> for(;;)
  2.   select() //执行一次事件轮询检查是否有IO事件
  3.   processSelectedKeys() //处理产生IO事件的Channel
  4.   runAllTasks() //处理异步任务队列
  5. //这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
复制代码
 
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
 
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     //Boolean that controls determines if a blocked Selector.select should break out of its selection process.
  4.     //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
  5.     private final AtomicBoolean wakenUp = new AtomicBoolean();
  6.     ...
  7.     @Override
  8.     protected void run() {
  9.         for (;;) {
  10.             ...
  11.             //1.调用select()方法执行一次事件轮询
  12.             select(wakenUp.getAndSet(false));
  13.             if (wakenUp.get()) {
  14.                 selector.wakeup();
  15.             }
  16.             ...
  17.         }
  18.     }
  19.     ...
  20. }
复制代码
如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     Selector selector;
  4.     ...
  5.     private void select(boolean oldWakenUp) throws IOException {
  6.         Selector selector = this.selector;
  7.         for(;;) {
  8.             //1.定时任务截止时间快到了,中断本次轮询
  9.             //2.轮询过程中发现有任务加入,中断本次轮询
  10.             //3.阻塞式select操作: selector.select(timeoutMills)
  11.             //4.避免JDK空轮询Bug
  12.         }
  13.     }
  14.     ...
  15. }
复制代码
(2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
 
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
 
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。
  1. //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
  2. public final class NioEventLoop extends SingleThreadEventLoop {
  3.     Selector selector;
  4.     ...
  5.     private void select(boolean oldWakenUp) throws IOException {
  6.         Selector selector = this.selector;
  7.         int selectCnt = 0;
  8.         long currentTimeNanos = System.nanoTime();//当前时间
  9.         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
  10.         for(;;) {
  11.             //1.定时任务截止时间快到了,中断本次轮询
  12.             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  13.             if (timeoutMillis <= 0) {
  14.                 if (selectCnt == 0) {
  15.                     selector.selectNow();//非阻塞执行select操作
  16.                     selectCnt = 1;
  17.                 }
  18.                 break;
  19.             }
  20.             ...
  21.         }
  22.     }
  23.     ...
  24. }
  25. //Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
  26. public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
  27.     ...
  28.     protected long delayNanos(long currentTimeNanos) {
  29.         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  30.         if (scheduledTask == null) {
  31.             return SCHEDULE_PURGE_INTERVAL;
  32.         }
  33.         return scheduledTask.delayNanos(currentTimeNanos);
  34.     }
  35.     ...
  36. }
  37. //Abstract base class for EventExecutors that want to support scheduling.
  38. public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
  39.     Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
  40.     ...
  41.     final ScheduledFutureTask<?> peekScheduledTask() {
  42.         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
  43.         if (scheduledTaskQueue == null) {
  44.             return null;
  45.         }
  46.         return scheduledTaskQueue.peek();
  47.     }
  48.     ...
  49. }
  50. final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
  51.     ...
  52.     public long delayNanos(long currentTimeNanos) {
  53.         return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
  54.     }
  55.     public long deadlineNanos() {
  56.         return deadlineNanos;
  57.     }
  58.     ...
  59. }
复制代码
 
9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
 
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
 
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
 
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
 
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
[code]//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {    //每一个NioEventLoop会有一个MPSC队列    private final Queue taskQueue;    ...        //Poll all tasks from the task queue and run them via Runnable#run() method.    //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos.    protected boolean runAllTasks(long timeoutNanos) {        //1.转移定时任务到MPSC队列,也就是任务聚合        fetchFromScheduledTaskQueue();        //从普通的任务队列(MPSC队列)里获取任务        Runnable task = pollTask();        if (task == null) {            afterRunningAllTasks();            return false;        }        //2.计算本轮任务执行的截止时间        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;        long runTasks = 0;        long lastExecutionTime;        //3.执行任务,通过for循环逐个执行pollTask()取出的任务        for (;;) {            //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行            safeExecute(task);            //3.2 累加当前已执行的任务数            runTasks ++;            //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的            if ((runTasks & 0x3F) == 0) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                if (lastExecutionTime >= deadline) {                    break;                }            }            //3.4 判断本轮任务是否已经执行完毕            task = pollTask();            if (task == null) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                break;            }        }        afterRunningAllTasks();        this.lastExecutionTime = lastExecutionTime;        return true;    }        private boolean fetchFromScheduledTaskQueue() {        long nanoTime = AbstractScheduledEventExecutor.nanoTime();        Runnable scheduledTask  = pollScheduledTask(nanoTime);        while (scheduledTask != null) {            if (!taskQueue.offer(scheduledTask)) {                scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask);                return false;            }            scheduledTask  = pollScheduledTask(nanoTime);        }        return true;    }        protected Runnable pollTask() {        assert inEventLoop();        return pollTaskFrom(taskQueue);    }    protected final Runnable pollTaskFrom(Queue taskQueue) {        for (;;) {            Runnable task = taskQueue.poll();            if (task == WAKEUP_TASK) {                continue;            }            return task;        }    }    ...}//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {    Queue> scheduledTaskQueue = this.scheduledTaskQueue;        ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();        if (scheduledTask == null) {            return null;        }        if (scheduledTask.deadlineNanos()
您需要登录后才可以回帖 登录 | 立即注册