找回密码
 立即注册
首页 业界区 业界 Netty源码—2.Reactor线程模型一

Netty源码—2.Reactor线程模型一

站竣凰 4 天前
大纲
1.关于NioEventLoop的问题整理
2.理解Reactor线程模型主要分三部分
3.NioEventLoop的创建
4.NioEventLoop的启动
 
1.关于NioEventLoop的问题整理
一.默认下Netty服务端起多少线程及何时启动?
答:默认是2倍CPU核数个线程。在调用EventExcutor的execute(task)方法时,会判断当前线程是否为Netty的Reactor线程,也就是判断当前线程是否为NioEventLoop对应的线程实体。如果是,则说明Netty的Reactor线程已经启动了。如果不是,则说明是外部线程调用EventExcutor的execute()方法。于是会先调用startThread()方法判断当前线程是否已被启动,如果还没有被启动就启动当前线程作为Netty的Reactor线程。
 
二.Netty是如何解决JDK空轮询的?
答:Netty会判断如果当前阻塞的一个Select()操作并没有花那么长时间,那么就说明此时有可能触发了空轮询Bug。默认情况下如果这个现象达到512次,那么就重建一个Selector,并且把之前Selector上所有的key重新移交到新Selector上。通过以上这种处理方式来避免JDK空轮询Bug。
 
三.Netty是如何保证异步串行无锁化的?
答:异步串行无锁化有两个场景。
场景一:拿到客户端一个Channel,不需要对该Channel进行同步,直接就可以多线程并发读写。
场景二:ChannelHandler里的所有操作都是线程安全的,不需要进行同步。
 
Netty在所有外部线程去调用EventLoop或者Channel的方法时,会通过inEventLoop()方法来判断出当前线程是外部线程(非NioEventLoop的线程实体)。在这种情况下,会把所有操作都封装成一个Task放入MPSC队列,然后在NioEventLoop的执行逻辑也就是run()方法里,这些Task会被逐个执行。
 
2.理解Reactor线程模型主要分三部分
一.NioEventLoop的创建
二.NioEventLoop的启动
三.NioEventLoop的执行
 
3.NioEventLoop的创建
(1)创建入口
(2)确定NioEventLoop的个数
(3)NioEventLoopGroup的创建流程
(4)创建线程执行器ThreadPerTaskExecutor
(5)创建NioEventLoop
(6)创建线程选择器EventExecutorChooser
(7)NioEventLoopGroup的创建总结
 
(1)创建入口
  1. EventLoopGroup bossGroup = new NioEventLoopGroup();
  2. EventLoopGroup workerGroup = new NioEventLoopGroup();
复制代码
(2)确定NioEventLoop的个数
由NioEventLoopGroup的构造方法来确定NioEventLoop的个数。如果NioEventLoopGroup没有传递构造参数,那么NioEventLoop线程的个数为CPU核数的2倍。如果NioEventLoopGroup传递了参数n,那么NioEventLoop线程的个数就是n。
 
(3)NioEventLoopGroup的创建流程
NioEventLoopGroup的构造方法会触发创建流程。
一.创建线程执行器ThreadPerTaskExecutor
每次调用ThreadPerTaskExecutor.execute()方法时都会创建一个线程。
二.创建NioEventLoop
NioEventLoop对应NioEventLoopGroup线程池里的线程,NioEventLoopGroup的构造方法会用一个for循环通过调用newChild()方法来创建NioEventLoop线程。
三.创建线程选择器EventExecutorChooser
线程选择器的作用是用于给每个新连接分配一个NioEventLoop线程,也就是从NioEventLoopGroup线程池中选择一个NioEventLoop线程来处理新连接。
  1. //MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
  2. public class NioEventLoopGroup extends MultithreadEventLoopGroup {
  3.     //Create a new instance using the default number of threads,
  4.     //the default ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
  5.     public NioEventLoopGroup() {
  6.         this(0);
  7.     }
  8.    
  9.     //Create a new instance using the specified number of threads,
  10.     //ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
  11.     public NioEventLoopGroup(int nThreads) {
  12.         this(nThreads, (Executor) null);
  13.     }
  14.    
  15.     public NioEventLoopGroup(int nThreads, Executor executor) {
  16.         this(nThreads, executor, SelectorProvider.provider());
  17.     }
  18.    
  19.     public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  20.         this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
  21.     }
  22.    
  23.     public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
  24.         super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  25.     }
  26.     ...
  27. }
  28. //Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
  29. public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
  30.     private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
  31.     private static final int DEFAULT_EVENT_LOOP_THREADS;
  32.     static {
  33.         DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
  34.         if (logger.isDebugEnabled()) logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
  35.     }
  36.     protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  37.         super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  38.     }
  39.     ...
  40. }
  41. //Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
  42. public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
  43.     private final EventExecutor[] children;
  44.     private final EventExecutorChooserFactory.EventExecutorChooser chooser;
  45.     ...
  46.     //Create a new instance.
  47.     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  48.         this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  49.     }
  50.    
  51.     //Create a new instance.
  52.     //@param nThreads,the number of threads that will be used by this instance.
  53.     //@param executor,the Executor to use, or null if the default should be used.
  54.     //@param chooserFactory,the EventExecutorChooserFactory to use.
  55.     //@param args,arguments which will passed to each #newChild(Executor, Object...) call
  56.     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
  57.         if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  58.         //1.创建ThreadPerTaskExecutor线程执行器
  59.         if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  60.         //2.创建NioEventLoop
  61.         children = new EventExecutor[nThreads];
  62.         for (int i = 0; i < nThreads; i ++) {
  63.             ...
  64.             //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
  65.             //传入线程执行器executor去创建NioEventLoop
  66.             children[i] = newChild(executor, args);
  67.         }
  68.         //3.创建线程选择器
  69.         chooser = chooserFactory.newChooser(children);
  70.         ...
  71.     }
  72.     ...
  73. }
复制代码
(5)创建NioEventLoop
说明一:
由MultithreadEventExecutorGroup的构造方法可知,Netty会使用for循环 + newChild()方法来创建nThreads个NioEventLoop,而且一个NioEventLoop对应一个线程实体FastThreadLocalThread。
  1. new NioEventLoopGroup() //线程组,线程个数默认为2 * CPU核数
  2.   new ThreadPerTaskExecutor() //创建线程执行器,作用是负责创建NioEventLoop对应的线程
  3.   for(...) { newChild() } //构造NioEventLoop,创建NioEventLoop线程组
  4.   chooserFactory.newChooser() //线程选择器,用于给每个新连接分配一个NioEventLoop线程
复制代码
说明四:
NioEventLoop的构造方法还会调用其父类的父类SingleThreadEventExecutor的构造方法。SingleThreadEventExecutor的构造方法里有两个关键的操作:一是把线程执行器保存起来,因为后面创建NioEventLoop对应的线程时要用到。二是创建一个MPSC任务队列,因为Netty中所有异步执行的本质都是通过该任务队列来协调完成的。
  1. //Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
  2. public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
  3.     private final EventExecutor[] children;
  4.     private final EventExecutorChooserFactory.EventExecutorChooser chooser;
  5.     ...   
  6.     //Create a new instance.
  7.     protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
  8.         if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  9.         //1.创建ThreadPerTaskExecutor线程执行器
  10.         if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  11.         //2.创建NioEventLoop
  12.         children = new EventExecutor[nThreads];
  13.         for (int i = 0; i < nThreads; i ++) {
  14.             ...
  15.             //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
  16.             //传入线程执行器executor去创建NioEventLoop
  17.             children[i] = newChild(executor, args);
  18.         }
  19.         //3.创建线程选择器
  20.         chooser = chooserFactory.newChooser(children);
  21.         ...
  22.     }
  23.    
  24.     protected ThreadFactory newDefaultThreadFactory() {
  25.         //getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型
  26.         //因为是通过NioEventLoopGroup的构造方法层层调用到这里的
  27.         return new DefaultThreadFactory(getClass());
  28.     }
  29.     ...
  30. }
  31. public final class ThreadPerTaskExecutor implements Executor {
  32.     private final ThreadFactory threadFactory;
  33.     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
  34.         if (threadFactory == null) throw new NullPointerException("threadFactory");
  35.         this.threadFactory = threadFactory;
  36.     }
  37.    
  38.     @Override
  39.     public void execute(Runnable command) {
  40.         //调用DefaultThreadFactory的newThread()方法执行Runnable任务
  41.         threadFactory.newThread(command).start();
  42.     }
  43. }
  44. //A ThreadFactory implementation with a simple naming rule.
  45. public class DefaultThreadFactory implements ThreadFactory {
  46.     private static final AtomicInteger poolId = new AtomicInteger();
  47.     private final AtomicInteger nextId = new AtomicInteger();
  48.     private final boolean daemon;
  49.     private final int priority;
  50.     protected final ThreadGroup threadGroup;
  51.     ...
  52.     public DefaultThreadFactory(Class<?> poolType) {
  53.         this(poolType, false, Thread.NORM_PRIORITY);
  54.     }
  55.    
  56.     public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
  57.         //toPoolName()方法会把NioEventLoopGroup的首字母变成小写
  58.         this(toPoolName(poolType), daemon, priority);
  59.     }
  60.    
  61.     public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
  62.         this(poolName, daemon, priority,
  63.         System.getSecurityManager() == null? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
  64.     }
  65.    
  66.     public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
  67.         ...
  68.         //prefix用来标记线程名字的前缀
  69.         prefix = poolName + '-' + poolId.incrementAndGet() + '-';
  70.         this.daemon = daemon;
  71.         this.priority = priority;
  72.         this.threadGroup = threadGroup;
  73.     }
  74.    
  75.     @Override
  76.     public Thread newThread(Runnable r) {
  77.         Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
  78.         if (t.isDaemon()) {
  79.             if (!daemon) t.setDaemon(false);
  80.         } else {
  81.             if (daemon) t.setDaemon(true);
  82.         }
  83.         if (t.getPriority() != priority) t.setPriority(priority);
  84.         return t;
  85.     }
  86.    
  87.     protected Thread newThread(Runnable r, String name) {
  88.         return new FastThreadLocalThread(threadGroup, r, name);
  89.     }
  90.     ...
  91. }
复制代码
MPSC队列也就是多生产者单消费者队列。单消费者是指某个NioEventLoop对应的线程(执行其run()方法的那个线程)。多生产者就是这个NioEventLoop对应的线程之外的线程,通常情况下就是我们的业务线程。比如,一些线程在调用writeAndFlush()方法时可以不用考虑线程安全而随意调用,那么这些线程就是多生产者。
 
MPSC队列是通过JCTools这个工具包来实现的,Netty的高性能很大程度上要归功于这个工具包。MPSC的全称是Muti Producer Single Consumer。Muti Producer对应的是外部线程,Single Consumer对应的是Netty的NioEventLoop线程。外部线程在执行Netty的一些任务时,如果判断不是由NioEventLoop对应的线程执行的,就会直接放入一个任务队列里,然后由一个NioEventLoop对应的线程去执行。
 
创建NioEventLoop总结:
NioEventLoopGroup的newChild()方法创建NioEventLoop时做了三项事情:一.创建一个Selector用于轮询注册到该NioEventLoop上的连接,二.创建一个MPSC任务队列,三.保存线程执行器到NioEventLoop。
 
(6)创建线程选择器EventExecutorChooser
说明一:
在传统的BIO编程中,一个新连接被创建后,通常需要给这个连接绑定一个Selector,之后这个连接的整个生命周期都由这个Selector管理。
 
说明二:
创建NioEventLoop时会创建一个Selector,所以一个Selector会对应一个NioEventLoop,一个NioEventLoop上会有一个Selector。线程选择器的作用就是为一个连接在NioEventLoopGroup中选择一个NioEventLoop,从而将该连接绑定到这个NioEventLoop的Selector上。
 
说明三:
根据MultithreadEventExecutorGroup的构造方法,会使用DefaultEventExecutorChooserFactory的newChooser()方法来创建线程选择器。创建好线程选择器EventExecutorChooser之后,便可以通过其next()方法获取一个NioEventLoop。
 
Netty通过判断NioEventLoopGroup中的NioEventLoop个数是否是2的幂来创建不同的线程选择器。但不管是哪一种选择器,最终效果都是从第一个NioEventLoop开始遍历到最后一个NioEventLoop,然后再从第一个NioEventLoop开始,如此循环。
[code]//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {    private final EventExecutor[] children;    private final EventExecutorChooserFactory.EventExecutorChooser chooser;    ...    //Create a new instance.    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);    }        //Create a new instance.    //@param nThreads,the number of threads that will be used by this instance.    //@param executor,the Executor to use, or null if the default should be used.    //@param chooserFactory,the EventExecutorChooserFactory to use.    //@param args,arguments which will passed to each #newChild(Executor, Object...) call    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {        if (nThreads
您需要登录后才可以回帖 登录 | 立即注册