大纲
1.关于NioEventLoop的问题整理
2.理解Reactor线程模型主要分三部分
3.NioEventLoop的创建
4.NioEventLoop的启动
1.关于NioEventLoop的问题整理
一.默认下Netty服务端起多少线程及何时启动?
答:默认是2倍CPU核数个线程。在调用EventExcutor的execute(task)方法时,会判断当前线程是否为Netty的Reactor线程,也就是判断当前线程是否为NioEventLoop对应的线程实体。如果是,则说明Netty的Reactor线程已经启动了。如果不是,则说明是外部线程调用EventExcutor的execute()方法。于是会先调用startThread()方法判断当前线程是否已被启动,如果还没有被启动就启动当前线程作为Netty的Reactor线程。
二.Netty是如何解决JDK空轮询的?
答:Netty会判断如果当前阻塞的一个Select()操作并没有花那么长时间,那么就说明此时有可能触发了空轮询Bug。默认情况下如果这个现象达到512次,那么就重建一个Selector,并且把之前Selector上所有的key重新移交到新Selector上。通过以上这种处理方式来避免JDK空轮询Bug。
三.Netty是如何保证异步串行无锁化的?
答:异步串行无锁化有两个场景。
场景一:拿到客户端一个Channel,不需要对该Channel进行同步,直接就可以多线程并发读写。
场景二:ChannelHandler里的所有操作都是线程安全的,不需要进行同步。
Netty在所有外部线程去调用EventLoop或者Channel的方法时,会通过inEventLoop()方法来判断出当前线程是外部线程(非NioEventLoop的线程实体)。在这种情况下,会把所有操作都封装成一个Task放入MPSC队列,然后在NioEventLoop的执行逻辑也就是run()方法里,这些Task会被逐个执行。
2.理解Reactor线程模型主要分三部分
一.NioEventLoop的创建
二.NioEventLoop的启动
三.NioEventLoop的执行
3.NioEventLoop的创建
(1)创建入口
(2)确定NioEventLoop的个数
(3)NioEventLoopGroup的创建流程
(4)创建线程执行器ThreadPerTaskExecutor
(5)创建NioEventLoop
(6)创建线程选择器EventExecutorChooser
(7)NioEventLoopGroup的创建总结
(1)创建入口- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码 (2)确定NioEventLoop的个数
由NioEventLoopGroup的构造方法来确定NioEventLoop的个数。如果NioEventLoopGroup没有传递构造参数,那么NioEventLoop线程的个数为CPU核数的2倍。如果NioEventLoopGroup传递了参数n,那么NioEventLoop线程的个数就是n。
(3)NioEventLoopGroup的创建流程
NioEventLoopGroup的构造方法会触发创建流程。
一.创建线程执行器ThreadPerTaskExecutor
每次调用ThreadPerTaskExecutor.execute()方法时都会创建一个线程。
二.创建NioEventLoop
NioEventLoop对应NioEventLoopGroup线程池里的线程,NioEventLoopGroup的构造方法会用一个for循环通过调用newChild()方法来创建NioEventLoop线程。
三.创建线程选择器EventExecutorChooser
线程选择器的作用是用于给每个新连接分配一个NioEventLoop线程,也就是从NioEventLoopGroup线程池中选择一个NioEventLoop线程来处理新连接。- //MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
- public class NioEventLoopGroup extends MultithreadEventLoopGroup {
- //Create a new instance using the default number of threads,
- //the default ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
- public NioEventLoopGroup() {
- this(0);
- }
-
- //Create a new instance using the specified number of threads,
- //ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
- public NioEventLoopGroup(int nThreads) {
- this(nThreads, (Executor) null);
- }
-
- public NioEventLoopGroup(int nThreads, Executor executor) {
- this(nThreads, executor, SelectorProvider.provider());
- }
-
- public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
- this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
- }
-
- public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
- super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
- }
- ...
- }
- //Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
- public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
- private static final int DEFAULT_EVENT_LOOP_THREADS;
- static {
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
- if (logger.isDebugEnabled()) logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
- }
- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- }
- ...
- }
- //Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
- public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
- private final EventExecutor[] children;
- private final EventExecutorChooserFactory.EventExecutorChooser chooser;
- ...
- //Create a new instance.
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
- this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
- }
-
- //Create a new instance.
- //@param nThreads,the number of threads that will be used by this instance.
- //@param executor,the Executor to use, or null if the default should be used.
- //@param chooserFactory,the EventExecutorChooserFactory to use.
- //@param args,arguments which will passed to each #newChild(Executor, Object...) call
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
- if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- //1.创建ThreadPerTaskExecutor线程执行器
- if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- //2.创建NioEventLoop
- children = new EventExecutor[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- ...
- //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
- //传入线程执行器executor去创建NioEventLoop
- children[i] = newChild(executor, args);
- }
- //3.创建线程选择器
- chooser = chooserFactory.newChooser(children);
- ...
- }
- ...
- }
复制代码 (5)创建NioEventLoop
说明一:
由MultithreadEventExecutorGroup的构造方法可知,Netty会使用for循环 + newChild()方法来创建nThreads个NioEventLoop,而且一个NioEventLoop对应一个线程实体FastThreadLocalThread。- new NioEventLoopGroup() //线程组,线程个数默认为2 * CPU核数
- new ThreadPerTaskExecutor() //创建线程执行器,作用是负责创建NioEventLoop对应的线程
- for(...) { newChild() } //构造NioEventLoop,创建NioEventLoop线程组
- chooserFactory.newChooser() //线程选择器,用于给每个新连接分配一个NioEventLoop线程
复制代码 说明四:
NioEventLoop的构造方法还会调用其父类的父类SingleThreadEventExecutor的构造方法。SingleThreadEventExecutor的构造方法里有两个关键的操作:一是把线程执行器保存起来,因为后面创建NioEventLoop对应的线程时要用到。二是创建一个MPSC任务队列,因为Netty中所有异步执行的本质都是通过该任务队列来协调完成的。- //Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
- public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
- private final EventExecutor[] children;
- private final EventExecutorChooserFactory.EventExecutorChooser chooser;
- ...
- //Create a new instance.
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
- if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- //1.创建ThreadPerTaskExecutor线程执行器
- if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- //2.创建NioEventLoop
- children = new EventExecutor[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- ...
- //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
- //传入线程执行器executor去创建NioEventLoop
- children[i] = newChild(executor, args);
- }
- //3.创建线程选择器
- chooser = chooserFactory.newChooser(children);
- ...
- }
-
- protected ThreadFactory newDefaultThreadFactory() {
- //getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型
- //因为是通过NioEventLoopGroup的构造方法层层调用到这里的
- return new DefaultThreadFactory(getClass());
- }
- ...
- }
- public final class ThreadPerTaskExecutor implements Executor {
- private final ThreadFactory threadFactory;
- public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
- if (threadFactory == null) throw new NullPointerException("threadFactory");
- this.threadFactory = threadFactory;
- }
-
- @Override
- public void execute(Runnable command) {
- //调用DefaultThreadFactory的newThread()方法执行Runnable任务
- threadFactory.newThread(command).start();
- }
- }
- //A ThreadFactory implementation with a simple naming rule.
- public class DefaultThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolId = new AtomicInteger();
- private final AtomicInteger nextId = new AtomicInteger();
- private final boolean daemon;
- private final int priority;
- protected final ThreadGroup threadGroup;
- ...
- public DefaultThreadFactory(Class<?> poolType) {
- this(poolType, false, Thread.NORM_PRIORITY);
- }
-
- public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
- //toPoolName()方法会把NioEventLoopGroup的首字母变成小写
- this(toPoolName(poolType), daemon, priority);
- }
-
- public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
- this(poolName, daemon, priority,
- System.getSecurityManager() == null? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
- }
-
- public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
- ...
- //prefix用来标记线程名字的前缀
- prefix = poolName + '-' + poolId.incrementAndGet() + '-';
- this.daemon = daemon;
- this.priority = priority;
- this.threadGroup = threadGroup;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
- if (t.isDaemon()) {
- if (!daemon) t.setDaemon(false);
- } else {
- if (daemon) t.setDaemon(true);
- }
- if (t.getPriority() != priority) t.setPriority(priority);
- return t;
- }
-
- protected Thread newThread(Runnable r, String name) {
- return new FastThreadLocalThread(threadGroup, r, name);
- }
- ...
- }
复制代码 MPSC队列也就是多生产者单消费者队列。单消费者是指某个NioEventLoop对应的线程(执行其run()方法的那个线程)。多生产者就是这个NioEventLoop对应的线程之外的线程,通常情况下就是我们的业务线程。比如,一些线程在调用writeAndFlush()方法时可以不用考虑线程安全而随意调用,那么这些线程就是多生产者。
MPSC队列是通过JCTools这个工具包来实现的,Netty的高性能很大程度上要归功于这个工具包。MPSC的全称是Muti Producer Single Consumer。Muti Producer对应的是外部线程,Single Consumer对应的是Netty的NioEventLoop线程。外部线程在执行Netty的一些任务时,如果判断不是由NioEventLoop对应的线程执行的,就会直接放入一个任务队列里,然后由一个NioEventLoop对应的线程去执行。
创建NioEventLoop总结:
NioEventLoopGroup的newChild()方法创建NioEventLoop时做了三项事情:一.创建一个Selector用于轮询注册到该NioEventLoop上的连接,二.创建一个MPSC任务队列,三.保存线程执行器到NioEventLoop。
(6)创建线程选择器EventExecutorChooser
说明一:
在传统的BIO编程中,一个新连接被创建后,通常需要给这个连接绑定一个Selector,之后这个连接的整个生命周期都由这个Selector管理。
说明二:
创建NioEventLoop时会创建一个Selector,所以一个Selector会对应一个NioEventLoop,一个NioEventLoop上会有一个Selector。线程选择器的作用就是为一个连接在NioEventLoopGroup中选择一个NioEventLoop,从而将该连接绑定到这个NioEventLoop的Selector上。
说明三:
根据MultithreadEventExecutorGroup的构造方法,会使用DefaultEventExecutorChooserFactory的newChooser()方法来创建线程选择器。创建好线程选择器EventExecutorChooser之后,便可以通过其next()方法获取一个NioEventLoop。
Netty通过判断NioEventLoopGroup中的NioEventLoop个数是否是2的幂来创建不同的线程选择器。但不管是哪一种选择器,最终效果都是从第一个NioEventLoop开始遍历到最后一个NioEventLoop,然后再从第一个NioEventLoop开始,如此循环。
[code]//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children; private final EventExecutorChooserFactory.EventExecutorChooser chooser; ... //Create a new instance. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } //Create a new instance. //@param nThreads,the number of threads that will be used by this instance. //@param executor,the Executor to use, or null if the default should be used. //@param chooserFactory,the EventExecutorChooserFactory to use. //@param args,arguments which will passed to each #newChild(Executor, Object...) call protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads |