大纲
1.Buffer缓冲区
2.Channel通道
3.BIO编程
4.伪异步IO编程
5.改造程序以支持长连接
6.NIO三大核心组件
7.NIO服务端的创建流程
8.NIO客户端的创建流程
9.NIO优点总结
10.NIO问题总结
1.Buffer缓冲区
(1)Buffer缓冲区的作用
(2)Buffer缓冲区的4个核心概念
(3)使用Direct模式创建的Buffer缓冲区
(4)如何分配和读写一个Buffer缓冲区
(5)如何操作一个分配好的Buffer缓冲区
(1)Buffer缓冲区的作用
在NIO中,所有的数据都是通过使用Buffer缓冲区来处理的。如果要通过NIO,将数据写到文件和网络或从文件和网络中读取数据,那么就需要使用Buffer缓冲区来进行处理。
(2)Buffer缓冲区的4个核心概念
Buffer缓冲区本质上是一个数组。通常它是一个字节数组ByteBuffer,当然还有其他基本类型的数组如CharBuffer。Buffer缓冲区不仅仅是一个数组,还可以对数据进行访问和对读写位置进行维护。所以Buffer缓冲区引入了4个核心概念:capacity、limit、position、mark。
一.capacity表示缓冲区的容量大小
也就是Buffer缓冲区里包含的数据的大小。如下所示,用字节数组封装了一个ByteBuffer,可以看其capacity是多少。- byte[] data = new byte[]{1, 2, 3};
- ByteBuffer buffer = ByteBuffer.wrap(data);
- System.out.println(buffer.capacity());
- //输出显示ByteBuffer的capacity是3,因为字节数组data的大小是3
复制代码 二.limit表示对Buffer缓冲区使用的一个限制
limit可以限制最多可以读取多少容量为capacity的缓冲区的数据。默认limit = capacity,limit后的位置不能读写数据。比如上述例子的3个数据,可用如下代码限制只能读取到index = 1的位置。- buffer.limit(1);
- System.out.println(buffer.limit());
复制代码 三.position表示数组中可以读写的位置
position不能大于limit。它会随着读写一直自动推进,直到跟limit一样才不能读写。如果手动设置的position大于limit,那么自动把limit设置为position。此外,remaining表示position到limit之间的距离。
四.mark表示对当前position位置的标记
在某个position的时候,设置一下mark,此时就可以设置一个标记。后续调用reset()方法可以把position复位到当时设置的那个mark位置上。把position或limit调整为小于mark的值时,就会丢弃这个mark。
总结:Buffer缓冲区就是一个数据缓冲区,可以支持不同的数据类型。比如ByteBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、IntBuffer,这些不同的Buffer里面就可以包裹不同基本类型的数组。每个Buffer类都是Buffer接口的子实例,大多数标准IO操作都使用ByteBuffer。
Buffer缓冲区还引入了capacity、limit、position、mark等概念,通过这些概念可以对Buffer缓冲区里的数据进行访问以及维护读写位置等信息。比如可通过limit、position、mark来限制能读那些数据,从哪里开始读。
下面是一个使用Buffer缓冲区的示例:- public class BufferDemo {
- public static void main(String[] args) throws Exception {
- byte[] data = new byte[] {55, 56, 57, 58, 59};
- ByteBuffer buffer = ByteBuffer.wrap(data);
-
- System.out.println(buffer.capacity());
- System.out.println(buffer.position());
- System.out.println(buffer.limit());
-
- System.out.println(buffer.get());//把当前position所在位置的数据读取一位出来
- System.out.println(buffer.position());
- buffer.mark();//在position = 1的时候打的mark,标记
-
- //buffer.position(3);
- //buffer.limit(4);
-
- buffer.position(3);
- System.out.println(buffer.get());
- System.out.println(buffer.position());
-
- buffer.reset();
- System.out.println(buffer.position());
- }
- }
复制代码 (3)使用Direct模式创建的Buffer缓冲区
有个缓冲区是比较特殊的,叫做Direct模式的缓冲区,它的整体性能比普通模式的缓冲区要高些。- //适用于从磁盘文件读数据出来,或者从网络里读数据进来
- ByteBuffer buffer1 = ByteBuffer.allocate(10);
- //如果用direct模式分配Buffer,整体性能可以比普通模式稍微高些
- ByteBuffer buffer2 = ByteBuffer.allocateDirect(10);
复制代码 (4)如何分配和读写一个Buffer缓冲区- 一.ByteBuffer.allocateDirect(100)
- 分配一个Direct缓冲区,效率更高。
- 二.ByteBuffer.wrap(byte[] array)
- 把一个字节数组作为数据放到缓冲区。
- 三.ByteBuffer.put(byte b) 和 ByteBuffer.get()
- 对当前position位置放入一个字节数据,或者读取一个字节数据。
- 四.ByteBuffer.put(byte[] src, int offset, int length) 和 ByteBuffer.get(byte[] dst, int offset, int length)
- 把指定src数组里的一段数据写入缓冲区,或者是从缓冲区里读取数据到数组中。
- 五.ByteBuffer.put(byte[] src) 和 ByteBuffer.get(byte[] dst)
- 把数组全部写入缓冲区,以及从缓冲区读取全部数据到数组里去。
复制代码 下面是这些API方法的使用示例:- public class BufferDemo2 {
- public static void main(String[] args) throws Exception {
- ByteBuffer buffer = ByteBuffer.allocateDirect(100);
-
- System.out.println("position=" + buffer.position());
- System.out.println("capacity=" + buffer.position());
- System.out.println("limit=" + buffer.limit());
-
- byte[] src = new byte[] {1, 2, 3, 4, 5};
- buffer.put(src);
- System.out.println("position=" + buffer.position());//position = 0~4,都填充了数据
-
- //此时position = 5,如果直接读取数据,是读不到的,会发现是空的、没有数据
- //所以需要手动操作一下position,调整到position = 0的位置,才能读到数据大小为5的字节数组src
- buffer.position(0);
- byte[] dst = new byte[5];
- buffer.get(dst);
-
- System.out.println("position=" + buffer.position());
- System.out.print("dst=[");
-
- for (int i = 0; i < dst.length; i++) {
- System.out.print(i);
- if (i < dst.length - 1) {
- System.out.print(",");
- }
- }
- System.out.print("]");
- }
- }
复制代码 (5)如何操作一个分配好的Buffer缓冲区- 一.ByteBuffer.clear()
- 还原缓冲区的状态:position设置为0、limit设置为capacity、丢弃mark。
- 但是本质并不是删除数据,只是还原了那些标记位而已。
- 还原之后就可以复用缓冲区里的空间,覆盖老的数据了。
- 二.ByteBuffer.flip()
- 准备读取刚写入的数据,就是将limit设置为当前position、将position设置为0、丢弃mark。
- 一般在写入完数据到缓冲区后,准备从位置=0开始读这段数据时,就可以使用flip。
- 三.ByteBuffer.rewind()
- 将position设置为0、并且丢弃mark。
- 一般在读取了一遍数据后,还想要再次重新读取一遍数据时,就可以使用rewind,此时limit是不变的。
复制代码 一般比较少会遇到直接操作Buffer的position、limit和mark的场景,通常都会使用上述操作Buffer缓冲区的几个方法:首先执行put()方法往缓冲区里写入数据,然后打算复用时,就执行clear()方法再重新写数据。如果写了一段数据打算要读取数据了,就执行flip()方法。如果希望重新读取一遍数据,就执行rewind()方法。
2.Channel通道
(1)NIO编程中的Channel是什么
(2)NIO编程中Buffer和Channel的关系
(3)基于FileChannel将数据写入磁盘文件
(4)利用FileChannel实现顺序写和随机写
(5)FileChannel是多线程并发安全的
(6)从磁盘文件读取数据到Buffer缓冲区
(7)对文件上共享锁限制文件只读
(8)FileChannel的强制刷盘
(9)FileChannle的position
(1)NIO编程中的Channel是什么
Channel是一个通道,网络数据通过Channel读取和写入。通道可以用于读、写或者二者同时进行。通道与流的区别在于:通道是双向的,流是单向的,一个流必须是InputStream和OutputStream的子类。
因为Channel是全双工的,所以它可以比流更好映射底层操作系统的API。在UNIX网络编程中,底层操作系统的通道都是全双工的,同时支持读写操作。
Channel可以分为两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel,其中ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
(2)NIO编程中Buffer和Channel的关系
(3)基于FileChannel将数据写入磁盘文件- public class FileChannelDemo {
- public static void main(String[] args) throws Exception {
- //构造一个传统的文件输出流
- FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
- //通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
- FileChannel channel = out.getChannel();
- ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
- channel.write(buffer);
- channel.close();
- out.close();
- }
- }
复制代码 (4)利用FileChannel实现顺序写和随机写
顺序写磁盘文件,就是不停地在文件末尾追加数据。
一.顺序写的例子- public class FileChannelDemo {
- public static void main(String[] args) throws Exception {
- //构造一个传统的文件输出流
- FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
- //通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
- FileChannel channel = out.getChannel();
- ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
- channel.write(buffer);
- //channel必然会从buffer的position = 0的位置开始读起,一直读到limit,limit = 字符串字节数组的长度
- //buffer的position此时就已经变成了跟limit一样了
- System.out.println(buffer.position());//输出11
- System.out.println(channel.position());//输出11,当前写到了文件的哪一个位置
-
- //继续往磁盘文件里写,就是从FileChannel的position = 11开始写,相当于文件末尾追加
- //如果想再次将buffer里的数据通过channel写入磁盘文件末尾,就是顺序写
- buffer.rewind();//position = 0,重新读一遍
- channel.write(buffer);//在文件末尾追加写的方式,顺序写,写完后文件内容就是"hello worldhello world"
- channel.close();
- out.close();
- }
- }
复制代码 二.随机写的例子- public class FileChannelDemo {
- public static void main(String[] args) throws Exception {
- //构造一个传统的文件输出流
- FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
- //通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
- FileChannel channel = out.getChannel();
- ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
- channel.write(buffer);
- //channel必然会从buffer的position = 0的位置开始读起,一直读到limit,limit = 字符串字节数组的长度
- //buffer的position此时就已经变成了跟limit一样了
- System.out.println(buffer.position());//输出11
- System.out.println(channel.position());//输出11,当前写到了文件的哪一个位置
- //如果继续往磁盘文件里写,就是从FileChannel的position = 11开始写,相当于文件末尾追加
- //现在已经在文件写入hello world,要继续在hello和world中间的那个空格地方,再写入一条数据,比如hello world
- //把这一段数据插入到磁盘文件的中间,就是磁盘随机写
-
- //在文件的随机的位置写入数据,肯定是要再次从buffer中读取数据,所以position必须复位
- buffer.rewind();
- //其次如果你要基于FileChannel随机写,可以调整FileChannel的position,这样文件内容为"hellohello world"
- channel.position(5);
- channel.write(buffer);
- channel.close();
- out.close();
- }
- }
复制代码 (5)FileChannel是多线程并发安全的
多线程并发写FileChannel是线程安全的。一个线程先执行完了写文件,下一个线程才能有机会去写,不可能多个线程同时基于一个FileChannel来写的。- public class FileChannelDemo2 {
- @SuppressWarnings("resource")
- public static void main(String[] args) throws Exception {
- //构造一个传统的文件输出流
- FileOutputStream out = new FileOutputStream("/Users/demo/Downloads/hello.txt");
- //通过文件输出流获取到对应的FileChannel,以NIO的方式来写文件
- final FileChannel channel = out.getChannel();
- //输出文件的内容是10个顺序的hello world
- for (int i = 0; i < 10; i++) {
- new Thread() {
- public void run() {
- try {
- ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
- channel.write(buffer);
- } catch (Exception e) {
- e.printStackTrace();
- }
- };
- }.start();
- }
- }
- //当然这里的out并没有关闭
- }
复制代码 (6)从磁盘文件读取数据到Buffer缓冲区
通过FileChannel + FileInputStream可以实现从磁盘文件读取数据到缓冲区。- public class FileChannelDemo3 {
- public static void main(String[] args) throws Exception {
- FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
- FileChannel channel = in.getChannel();
- ByteBuffer buffer = ByteBuffer.allocateDirect(16);
- channel.read(buffer);//读数据写入buffer,所以写完以后,buffer的position = 16
- buffer.flip();//position = 0,limit = 16
- byte[] data = new byte[16];
- buffer.get(data);
- System.out.println(new String(data));
- channel.close();
- in.close();
- }
- }
复制代码 (7)对文件上共享锁限制文件只读
JVM内的多个线程对同一个FileChannel进行写,是线程安全的。但如果是多个JVM对同一个FileChannel进行写,则不是线程安全。
为此,FileChannel提供了文件锁功能,可以对文件上锁。FileChannel的文件锁分为共享锁和独占锁。
有线程先获取了共享锁读文件,其他JVM里的线程就不能获取独占锁写文件。有线程先获取了独占锁写文件,其他JVM里的线程就不能获取共享锁读文件。
如果对文件上共享锁,不同JVM的不同线程都可以同时获取共享锁读文件,除非已有某个JVM的某线程获取到了独占锁写文件。
一.先运行如下的FileLockDemo1程序启动一个JVM对文件加共享锁- public class FileLockDemo1 {
- public static void main(String[] args) throws Exception {
- //新建一个输入流
- FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
- FileChannel channel = in.getChannel();
-
- //注意输入流是不能加独占锁的,否则会报错
- channel.lock(0, Integer.MAX_VALUE, true);//加共享锁
- Thread.sleep(60 * 60 * 1000);
- channel.close();
- in.close();
- }
- }
复制代码 二.再运行如下的FileLockDemo2程序启动另一个JVM
由于该程序对文件加的是独占锁,所以此时会报错,只能加共享锁。- public class FileLockDemo2 {
- public static void main(String[] args) throws Exception {
- RandomAccessFile file = new RandomAccessFile("/Users/demo/Downloads/hello.txt", "rw");
- FileChannel channel = file.getChannel();
-
- //加独占锁,阻塞住,会等待别人释放共享锁了,这里才能成功加独占锁
- channel.lock(0, Integer.MAX_VALUE, false);
- System.out.println("加锁成功");
- Thread.sleep(60 * 60 * 1000);
-
- channel.close();
- file.close();
- }
- }
复制代码 (8)FileChannel的强制刷盘
通过FileChannel写数据到磁盘文件时,不会立即将数据写到磁盘上,而会先将数据写到操作系统自己管理的一个内存区域OS Cache上。这样处理的好处是可以让磁盘写的性能比较高,但如果此时系统宕机,那么OS Cache里的数据可能就会丢失。
Kafka和ElasticSearch将数据写到磁盘时,都是先写入OS Cache的,操作系统会定时把OS Cache里的数据写入到磁盘上。
如果希望数据可以马上写入磁盘,就不能只用FileChannel.write()方法,还需要通过FileChannel.force()方法将数据从OS Cache强制刷入磁盘。当然,FileChannel将数据强制刷盘会让磁盘写的性能降低。- public class FileLockDemo1 {
- public static void main(String[] args) throws Exception {
- //新建一个输入流
- FileInputStream in = new FileInputStream("/Users/demo/Downloads/hello.txt");
- FileChannel channel = in.getChannel();
-
- ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
- channel.write(buffer);
- channel.force(true);//强制数据从OS Cache刷入磁盘,避免停留在OS Cache中
- channel.close();
- in.close();
- }
- }
复制代码 (9)FileChannel的position
FileChannel有一个position的概念,它相当于一个指针,指向了当前文件的某个位置。
读数据是从FileChannel当前的position开始读的,每次刚开始初始化一个FileChannel后,position = 0。如果要随机读取文件里的某个位置,那么直接设置和定位FileChannel的position,就可以限定从什么位置开始读。
写数据也是从FileChannel当前的position开始写的,刚开始position = 0。如果直接写,那么可能会覆盖原来的老数据。读了多少字节的数据或写了多少字节的数据,FileChannel的position就会往前移动多少位。
3.BIO编程
(1)网络编程的Client/Server模型
(2)BIO服务端通信模型
(3)BIO网络编程的服务端
(4)BIO网络编程的客户端
(1)网络编程的Client/Server模型
网络编程的基本模型是Client/Server模型。客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接。如果连接建立成功,双方就可以通过网络套接字Socket进行通信。
在基于传统同步阻塞模型(BIO)开发中:ServerSocket负责绑定IP地址、启动监听端口,Socket负责发起连接操作。在连接成功后,双方通过输入和输出流进行同步阻塞通信。
(2)BIO服务端通信模型
通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求后会为每个客户端创建一个新的线程进行业务处理。业务处理完成后,通过输出流返回应答给客户端,然后销毁线程。这是典型的一请求一应答通信模型。
(3)BIO网络编程的服务端- //服务端(为简洁省略try catch)
- public class SocketServer {
- public static void main(String[] args) throws Exception {
- ServerSocket serverSocket = new ServerSocket(9000);
- Socket socket = serverSocket.accept();//在这里会阻塞住,一直等待客户端建立连接
- //如果有个客户端要和当前服务端尝试建立TCP连接,并基于TCP协议来传输数据,发送一个一个的TCP包
- //那么客户端必须先和当前服务端的端口号建立连接
- //客户端Socket和服务端ServerSocket互相传递3次握手的TCP包,TCP连接就建立了
-
- //当客户端向当前服务端发起TCP三次握手建立一个连接后,这里就会构建出一个Socket
- //这个Socket就代表了当前服务端和某个客户端的一个TCP连接(Socket连接)
- //当建立了TCP连接后,客户端就会通过IO流的方式发送数据过来
- //由于IO流是无限的流,所以底层的TCP协议会把流式的数据拆分为一个一个的TCP包
- //然后将TCP包包裹在IP包里,IP包又会被包裹在以太网包里
- //最后通过底层的网络硬件设备以及以太网的协议,发送以太网包的数据
- //获取Socket的输入流
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- //获取Socket的输出流
- OutputStream out = socket.getOutputStream();
- char[] buf = new char[1024 * 1024];//JVM的一个缓冲数组
- int len = in.read(buf);
- //Socket的输入流,相当于不停读取客户端通过TCP协议发送过的一个一个的TCP包
- //然后把TCP包里的数据通过IO输入流的方式提供给服务端
- //这样服务端就可以通过IO输入流读取的方式,把TCP包里的数据读出来,然后放入JVM内存的一个缓冲数组中
- while (len != -1) {
- String request = new String(buf, 0, len);
- System.out.println("服务端接收到了请求:" + request);
- //输出流的意思是,服务端会通过IO流发送响应数据回客户端
- //此时在底层会把服务端的响应数据拆分为一个一个的TCP包,回传给客户端
- //这样客户端就可以接收到服务端发送的TCP包了
- out.write("收到,收到".getBytes());
-
- //in.read会阻塞在这里尝试读取数据,所以out.write要放在前面
- len = in.read(buf);
- //为什么需要通过while循环反复去读取Socket流传输过来的数据?
- //因为客户端是不停地用流的方式发送数据给服务端的,所以服务端需要不停地读取
- //此外,由于buf才1KB,如果客户端发送过来的数据是几十KB,
- //那么当服务端读取完1KB的数据后,还需要继续读取几十KB的数据,
- //因此才需要服务端不停的读取
- }
- //释放资源
- out.close();
- in.close();
- //通过TCP四次挥手断开连接
- socket.close();
- serverSocket.close();
- }
- }
复制代码 (4)BIO网络编程的客户端- //客户端(为简洁省略try catch)
- public class SocketClient {
- public static void main(String[] args) throws Exception {
- //此处应该是会找DNS服务查找域名对应的IP地址
- Socket socket = new Socket("localhost", 9000);
- //接下来需要跟某个IP地址上的9000端口的服务器程序进行TCP三次握手,然后建立连接
- //这时就会构造一个三次握手中的第一次握手的TCP包,在这个TCP包里放入三次握手需要的数据
- //然后把这个TCP包封装在IP包里,IP包里是有对应的目标的IP地址,IP包再封装在以太网包里
- //接着通过底层的硬件设备和以太网协议把以太网包发送出去
- //经过路由器时,会通过IP地址查找路由表来确定下一个路由器的位置
- //查找到下一个路由器的mac地址后将其写入到以太网包头,继续通过下一个子网广播出去
- //通过这种方式层层转发,一直到对应的服务器上去
- //服务端接收到三次握手的第一次握手的TCP包后
- //服务端就会回传第二次握手的TCP包给这个客户端的程序,客户端会再次发送第三次握手的TCP包过去
- //这样三次握手成功,TCP连接建立起来了
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
-
- //发送数据流,底层拆分为一个一个的TCP包发到服务端去
- out.write("你好".getBytes());
- char[] buf = new char[1024 * 1024];
- int len = in.read(buf);
- while (len != -1) {
- String response = new String(buf, 0, len);
- System.out.println("客户端接收到了响应:" + response);
- len = in.read(buf);
- }
-
- //释放资源
- in.close();
- out.close();
- //通过TCP四次挥手断开连接
- socket.close();
- }
- }
复制代码 (5)BIO网络编程存在的问题
缺乏弹性伸缩能力,服务端的线程个数和客户端的并发访问数呈1 : 1的正比关系。当客户端并发访问量增加后,可能会耗尽服务端的线程资源。
4.伪异步IO编程
(1)BIO的主要问题
(2)BIO编程模型的改进
(3)伪异步IO编程
(4)伪异步IO的问题
(5)伪异步IO可能引起的级联故障
(1)BIO的主要问题
BIO的主要问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理新接入的客户端链路,一个线程只能处理一个客户端连接。
比如在上述BIO编程的服务端程序中:SocketServer和一个客户端建立连接后,服务端会一直执行in.read(buf)进行阻塞等待,从而导致服务端只能和这个客户端进行通信。如果有一个新的客户端想和服务端建立连接,那么服务端是不能和这个新的客户端建立TCP连接的,因为服务端还在阻塞执行in.read(buf)而没能执行serverSocket.accept()。
(2)BIO编程模型的改进
为了改进一个线程处理一个连接的BIO模型,可以通过线程池或者消息队列,实现一个或者多个线程处理N个客户端的伪异步IO模型。
此时由于底层通信机制依然使用同步阻塞IO,所以被称为伪异步IO。服务端线程池的最大线程数N和客户端并发访问数M呈N : M的比例关系。改进后的BIO模型能有效防止海量并发接入导致线程资源耗尽。
(3)伪异步IO编程
当有新的客户端接入时,将客户端的Socket封装成一个任务Task(实现Runnable接口)投递到后端的线程池中进行处理。这个线程池中会维护一个消息队列和N个活跃的线程来对消息队列中的任务Task进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此线程资源的占用是可控的,不会导致线程耗尽而宕机。- //服务端(为简洁省略try catch)
- public class SocketServer {
- public static void main(String[] args) throws Exception {
- ServerSocket serverSocket = new ServerSocket(9000);
- ServerHandlerExecutePool singleExcutor = new ServerHandlerExecutePool(50, 1000);
- while (true) {
- //在这里会阻塞住,一直等待客户端来建立连接
- Socket socket = serverSocket.accept();
- //获取客户端的连接后,将socket提交给线程池处理
- singleExcutor.execute(new ServerHandler(socket));
- }
- ...
- }
- }
- public class ServerHandlerExecutePool {
- private ExecutorService executor;
- public ServerHandlerExecutePool(int maxPoolSize, int queueSize) {
- executor = new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- maxPoolSize,
- 120L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize)
- );
- }
-
- public void execute(Runnable task) {
- executor.execute(task);
- }
- }
- public class ServerHandler implements Runnable {
- private Socket socket;
- public ServerHandler(Socket socket) {
- this.socket = socket;
- }
-
- @Override
- public void run() {
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- char[] buf = new char[1024 * 1024];
- int len = in.read(buf);
- while (len != -1) {
- String request = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]服务端接收到了请求:" + request);
- out.write("收到,收到".getBytes());
- len = in.read(buf);
- }
- //释放资源
- out.close();
- in.close();
- //通过TCP四次挥手断开连接
- socket.close();
- }
- }
复制代码 (4)伪异步IO的问题
一.当对Socket的输入流进行读操作时
也就是调用Java输入流InputStream的read()方法读输入流时,会一直阻塞下去直到可用数据已经读取完毕。这意味着当一方发送请求或者应答消息比较缓慢或者网络传输比较缓慢时,读取输入流的一方的通信线程将被长时间阻塞。如果发送方要60s才能够将数据发送完毕,读取方的IO线程在执行read()方法时会被同步阻塞60s。在此期间,其他的连接任务只能在消息队列中排队。
二.当对Socket的输出流进行写操作时
也就是调用Java输出流OutputStream的write()方法写输出流时,也会一直阻塞下去直到所有要发送的字节全部写入完毕。当消息的接收方处理缓慢时,将不能及时从TCP缓冲区中读取数据。这将会导致发送方的TCP Window Size不断减小,直到为0。最后消息发送方将不能再向TCP缓冲区写入消息,于是write()方法将被阻塞直到TCP Window Size大于0。所以输入流和输出流的读和写操作都是同步阻塞的,阻塞时间取决于对方IO线程的处理速度和网络IO的传输速度。
(5)伪异步IO可能引起的级联故障
一.服务端处理缓慢,返回应答消息耗费60s,平时只需10ms。
二.此时客户端在读取服务端响应,由于读取输入流是阻塞的,客户端将会被同步阻塞60s。
三.假如所有可用线程都被该服务端阻塞,那后续的所有IO消息都将在队列中排队。
四.由于线程池采用阻塞队列实现,当队列积满后,后续入队的操作将会被阻塞。
五.由于只有一个Acceptor线程接收客户端接入,线程池的阻塞队列满了之后,它也会被阻塞。于是新的客户端请求消息将会被拒绝,从而客户端发生大量的连接超时。
因此BIO模型显然无法支持百万并发请求,因为一个线程只能处理一个请求。改进BIO后的伪异步IO模型即便可以通过一个线程处理多个请求,也存在级联故障的问题。
5.改造程序以支持长连接
(1)什么是长连接和短连接
(2)长连接和短连接的代码
(1)什么是长连接和短连接
一.短连接
客户端每次建立一个连接,发送一个请求,获取一个响应,然后就断开连接,就是所谓的短连接。
二.长连接
客户端每次建立一个连接,可以发送很多个请求,一直持续维持这个TCP连接,不断开连接。客户端持续通过这个连接与服务端进行通信,不停地发送数据和请求。服务端也长期维持这个连接,不停地接受请求返回响应。这个就是所谓的长连接,连接存在的时间很长的。所以只要客户端不停地发送请求不释放连接,那么就是长连接了。
(2)长连接和短连接的代码- //客户端(短连接)(为简洁省略try catch)
- public class SocketClient {
- public static void main(String[] args) throws Exception {
- for (int i = 0; i < 10; i++) {
- new Thread() {
- public void run() {
- Socket socket = new Socket("localhost", 9000);
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- //发送数据流,底层拆分为一个一个的TCP包发过去
- out.write("你好".getBytes());
- char[] buf = new char[1024 * 1024];
- int len = in.read(buf);
- if (len != -1) {
- String response = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
- }
- in.close();
- out.close();
- socket.close();
- };
- }.start();
- }
- }
- }
- //客户端(长连接)(为简洁省略try catch)
- public class SocketClient {
- public static void main(String[] args) throws Exception {
- for (int i = 0; i < 10; i++) {
- new Thread() {
- public void run() {
- Socket socket = new Socket("localhost", 9000);
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- char[] buf = new char[1024 * 1024];
- //客户端不停地每隔一秒发送请求,不释放就是长连接了
- while (true) {
- //发送数据流,底层拆分为一个一个的TCP包发过去
- out.write("你好".getBytes());
- int len = in.read(buf);
- if (len != -1) {
- String response = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
- }
- Thread.sleep(1000);
- }
- ...
- };
- }.start();
- }
- }
- }
复制代码
6.NIO三大核心组件
(1)Buffer缓冲区
(2)Channel数据通道
(3)Selector多路复用器
(4)BIO和IO多路复用的理解
(5)一些概念补充
NIO的三大核心组件分别是:Buffer、Channel、Selector。
(1)Buffer缓冲区
Buffer缓冲区是用来封装数据的,也就是把数据写入到Buffer、或者从Buffer中读取数据。
(2)Channel数据通道
Channel就是一个数据管道,负责传输数据(封装好数据的Buffer),比如把数据写入到文件或网络、从文件或网络中读取数据。
(3)Selector多路复用器
Selector会不断地轮询注册在其上的Channel。如果某个Channel上发生读或写事件,那么这个Channel就处于就绪状态。然后就绪的Channel就会被Selector轮询出来,具体就是通过Selector的SelectionKey来获取就绪的Channel集合。获取到就绪的Channel后,就可以进行后续的IO操作了。
一个Selector多路复用器可以同时轮询多个Channel。由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这意味着只需要一个线程负责Selector多路复用器的轮询,就可以接入成千上万的客户端。
(4)BIO和IO多路复用的理解
由于TCP连接的建立需要经过三次握手,所以可理解为客户端向服务端发起的Socket连接就绪需要经过三次握手请求。服务端接收到客户端的第一次握手请求时,会创建Socket连接(即创建一个Channel)。服务端接收客户端的第三次握手请求,这个Socket连接才算准备好(Channel才算就绪)。
在BIO模型下,只有一次系统调用recvfrom。所以服务端从接收到客户端的第一次握手请求开始,就必须同步阻塞等待直到第三次握手请求的完成,这样才能获取准备好的Socket连接并读取请求数据。
在多路复用模型下,会有两次系统调用select和recvfrom。所以服务端接收到客户端的第一次握手请求后,不必创建线程然后通过阻塞来等待第三次握手请求的完成,而是可以直接通过轮询Selector(基于select系统调用),来获取所有已经完成第三次握手请求(已就绪)的客户端Socket连接,之后对这些Socket连接进行recvfrom系统调用时不需要阻塞也能马上读取到请求数据了。
(5)一些概念补充
一.异步非阻塞IO
NIO类库支持非阻塞的读和写操作。相比BIO同步阻塞的读和写操作,NIO是异步的,所以也称NIO为异步非阻塞IO。
二.多路复用Selector
NIO的实现关键是多路复用IO技术。多路复用的核心是通过Selector来轮询注册在其上的Channel,执行Selector.select()方法进行轮询时是同步阻塞的。当发现有Channel处于就绪状态后,Selector.select()方法就会从阻塞状态中返回,然后就可以通过Selector.selectedKeys()方法获取就绪的Channel进行IO操作。
三.伪异步IO
在通信线程和业务线程之间做一个缓冲区,这个缓冲区用于隔离IO线程和业务线程间的直接访问,这样业务线程就不会被IO线程阻塞了。
对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不再直接访问IO线程或者进行IO读写,这样也就不会被同步阻塞。
7.NIO服务端的创建流程
步骤一:通过ServerSocketChannel的opne()方法打开ServerSocketChannel。
步骤二:设置ServerSocketChannel为非阻塞模式,绑定监听地址和端口。
步骤三:通过Selector的open()方法创建多路复用器Selector,将已打开的ServerSocketChannel注册到多路复用器Selector上以及监听ACCEPT事件。
步骤四:多路复用器Selector通过select()方法轮询准备就绪的SelectionKey。
步骤五:如果这个SelectionKey是acceptable,说明有客户端发起了连接请求。此时需要调用ServerSocketChannel的accept()方法来处理该接入请求,也就是完成TCP三次握手并建立物理链路以及得到该客户端连接SocketChannel。
步骤六:然后将新接入的客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。
步骤七:如果这个SelectionKey是readable,说明有客户端发送了数据过来。此时需要调用SocketChannel的read()方法异步读取客户端发送的数据到ByteBuffer缓冲区,同时将客户端连接SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步骤八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理结果对象encode编码放入ByteBuffer缓冲区,最后调用SocketChannel的write()方法异步发送给客户端,以及将客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。- public class NIOServer {
- private static Selector selector;
-
- public static void main(String[] args) {
- init();
- listen();
- }
- private static void init() {
- //serverSocketChannel其实就是ServerSocket,负责跟各个客户端连接连接请求
- //SelectionKey.OP_ACCEPT的意思是仅仅关注ServerSocketChannel接收到的TCP连接请求,也就是监听ACCEPT事件
- //ServerSocketChannel是注册在Selector上面的
- ServerSocketChannel serverSocketChannel = null;
- try {
- selector = Selector.open();
- serverSocketChannel = ServerSocketChannel.open();//步骤一:打开ServerSocketChannel
- serverSocketChannel.configureBlocking(false);//步骤二:NIO就是支持非阻塞的
- serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);//步骤二
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//步骤三:注册到selector + 关注OP_ACCEPT事件
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private static void listen() {
- while(true) {
- try {
- //select()方法是阻塞的,看注册到Selector上的ServerSocketChannel是否接收到了请求
- selector.select();//步骤四
- Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
- while(keysIterator.hasNext()) {
- SelectionKey key = (SelectionKey) keysIterator.next();
- //可以认为一个SelectionKey代表了一个请求
- keysIterator.remove();
- handleRequest(key);
- }
- } catch(Throwable t){
- t.printStackTrace();
- }
- }
- }
- private static void handleRequest(SelectionKey key) throws IOException, ClosedChannelException {
- //会有个线程池获取到了请求,下面的代码都应该在线程池的工作线程里处理
- SocketChannel channel = null;
- try {
- //如果这个SelectionKey是acceptable,也就是连接请求
- //那么注册对应的SocketChannel到selector上,并关注OP_READ事件
- if (key.isAcceptable()) {//步骤五
- System.out.println("[" + Thread.currentThread().getName() + "]接收到连接请求");
- //从SelectionKey中拿出ServerSocketChannel
- ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
- //调用ServerSocketChannel的accept方法,就可以跟客户端进行TCP三次握手
- channel = serverSocketChannel.accept();
- System.out.println("[" + Thread.currentThread().getName() + "]建立连接时获取到的channel=" + channel);
- //如果三次握手成功了之后,就可以获取到一个建立好TCP连接的SocketChannel
- //这个SocketChannel大概可以理解为,底层有一个Socket是跟客户端进行连接的
- //这个SocketChannel就是联通到那个Socket上去,负责进行网络数据的读写的
- //下面配置成非阻塞的
- channel.configureBlocking(false);
- //将该SocketChannel注册到selector上,且仅仅关注READ请求,也就是关注发送数据过来的请求
- channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
- } else if (key.isReadable()) {//步骤七
- //如果这个SelectionKey是readable,也就是发送了数据过来,此时需要读取客户端发送过来的数据
- channel = (SocketChannel) key.channel();
- //读取请求数据的buffer缓冲
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- //通过底层的socket读取数据,写入buffer中,position可能就会变成比如21之类的
- //socket读取到了多少个字节,此时buffer的position就会变成多少
- int count = channel.read(buffer);//步骤七
- System.out.println("[" + Thread.currentThread().getName() + "]接收到请求");
- if (count > 0) {
- //设置position = 0,limit = 21,仅仅读取buffer中,0~21这段刚刚写入进去的数据
- buffer.flip();
- System.out.println("服务端接收请求:" + new String(buffer.array(), 0, count));
- channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
- }
- } else if (key.isWritable()) {//步骤八
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("Server收到了".getBytes());
- buffer.flip();
- channel = (SocketChannel) key.channel();
- channel.write(buffer);//步骤八
- channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
- }
- } catch(Throwable t) {
- t.printStackTrace();
- if (channel != null) {
- channel.close();
- }
- }
- }
- }
复制代码
8.NIO客户端的创建流程
步骤一:通过SocketChannel的open()方法打开SocketChannel。
步骤二:设置SocketChannel为非阻塞模式,同时设置TCP参数。
步骤三:调用SocketChannel的connect()方法异步连接服务端,完成TCP三次握手并建立物理链路。
步骤四:通过Selector的open()方法创建多路复用器Selector,并将已打开的SocketChannel注册到多路复用器Selector上以及监听CONNECT事件。
步骤五:多路复用器Selector通过select()方法轮询准备就绪的SelectionKey。
步骤六:如果这个SelectionKey是connectable,说明服务端接受了发起的连接请求,于是将SocketChannel注册到多路复用器Selector上以及监听READ事件。
步骤七:如果这个SelectionKey是readable,说明服务端返回了数据。于是调用SocketChannel的read()方法异步读取服务端返回的数据到ByteBuffer缓冲区,同时将客户端连接SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步骤八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理结果对象encode编码放入ByteBuffer缓冲区,最后调用SocketChannel的write()方法异步发送给客户端,以及将客户端连接SocketChannel注册到多路复用器Selector上以及监听READ事件。- public class NIOClient {
- public static void main(String[] args) {
- //启动10个线程去和服务端建立通信
- for (int i = 0; i < 10; i++) {
- new Worker().start();
- }
- }
- static class Worker extends Thread {
- @Override
- public void run() {
- SocketChannel channel = null;
- Selector selector = null;
- try {
- //SocketChannel底层就是封装了一个Socket
- //SocketChannel是通过底层的Socket网络连接上服务端的数据通道,负责基于网络读写数据
- channel = SocketChannel.open();//步骤一
- //配置成非阻塞的
- channel.configureBlocking(false);//步骤二
- //底层会发起了一个TCP三次握手尝试建立连接
- channel.connect(new InetSocketAddress("localhost", 9000));//步骤三
- selector = Selector.open();
- //监听connect行为
- channel.register(selector, SelectionKey.OP_CONNECT);//步骤四:注册到selector + 关注OP_CONNECT事件
- while(true) {
- //三次握手成功后,服务端会给客户端返回一个响应请求,通过如下代码把响应请求拿到
- selector.select();//步骤五
- Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
- while(keysIterator.hasNext()) {
- SelectionKey key = (SelectionKey) keysIterator.next();
- keysIterator.remove();
- //如果server返回的是一个connectable的消息
- if (key.isConnectable()) {//步骤六
- channel = (SocketChannel) key.channel();
- if (channel.isConnectionPending()) {
- //一旦建立连接成功了以后,此时就可以给server发送一个数据了
- channel.finishConnect();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("你好".getBytes());
- buffer.flip();
- channel.write(buffer);
- }
- //接下来监听READ事件就是准备读服务端返回的数据
- channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
- } else if (key.isReadable()) {//步骤七:说明服务器端返回了一条数据可以读了
- channel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- int len = channel.read(buffer);//把数据写入buffer,position推进到读取的字节数数字
- if (len > 0) {
- System.out.println("[" + Thread.currentThread().getName() + "]收到响应:" + new String(buffer.array(), 0, len));
- Thread.sleep(5000);
- //睡眠5秒后,接下来继续监听WRITE事件,并准备写数据给服务端
- channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
- }
- } else if (key.isWritable()) {//步骤八
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("重复你好了".getBytes());
- buffer.flip();
- channel = (SocketChannel) key.channel();
- channel.write(buffer);
- //再次重复发数据给服务端后,接下来要监听READ事件就是准备读服务端返回的数据
- channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (channel != null){
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
复制代码
9.NIO优点总结
优点一:SocketChannel的连接操作是异步的
也就是客户端发起的连接操作SocketChannel.connect()是异步的。可以将SocketChannel注册到多路复用器上并关注OP_CONNECT事件等待后续结果,不需要像BIO的客户端那样被同步阻塞。
优点二:SocketChannel的读写操作都是异步的
也就是客户端发起的读写操作SocketChannel.read()和write()是异步的。如果没有可读写的数据它不会同步等待,而会直接返回。这样IO通信链路就可以处理其他链路了,不需要同步等待这个链路可用。
优点三:优化了线程模型
由于JDK的Selector在Linux等主流操作系统上通过epoll实现,从而没有了连接句柄数的限制。这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随客户端增加而线性下降。注意:Selector.select()是同步阻塞的。
优点四:优化了IO读写
BIO的读写是面向流的,一次性只能从流中读取一子节或多字节,而且读完后流无法再读取,需要自己缓存数据。NIO的读写是面向Buffer的,可随意读取里面任何字节的数据。不需要自己缓存数据,只需要移动读写指针即可。
10.NIO问题总结
问题一:NIO的类库和API繁杂,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
问题二:处理常见问题的工作量和难度比较大,比如客户断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流等。
问题三:NIO的epoll bug会导致Selector空轮询,最终导致CPU达到100%。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |