找回密码
 立即注册
首页 业界区 业界 Netty基础—7.Netty实现消息推送服务

Netty基础—7.Netty实现消息推送服务

笃扇 3 天前
大纲
1.Netty实现HTTP服务器
2.Netty实现WebSocket
3.Netty实现的消息推送系统
(1)基于WebSocket的消息推送系统说明
(2)消息推送系统的PushServer
(3)消息推送系统的连接管理封装
(4)消息推送系统的ping-pong探测
(5)消息推送系统的全连接推送
(6)消息推送系统的HTTP响应和握手
(7)消息推送系统的运营客户端
(8)运营客户端连接PushServer
(9)运营客户端的Handler处理器
(10)运营客户端发送推送消息
(11)浏览器客户端接收推送消息
 
1.Netty实现HTTP服务器
(1)HTTP请求消息和响应消息
(2)Netty实现的HTTP协议栈的优势
(3)Netty实现的HTTP服务器
(4)请求的解析处理和响应的编码处理
 
(1)HTTP请求消息和响应消息
HTTP请求消息由三部分组成:请求行、请求头、请求体。HTTP响应消息也由三部分组成:响应行、响应头、响应体。
 
(2)Netty实现的HTTP协议栈的优势
Netty实现的HTTP协议栈无论在性能还是在可靠性上,都表现优异,非常适合在非Web容器下的场景使用。相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。
 
(3)Netty实现的HTTP服务器
  1. public class NettyHttpServer {
  2.     private static final Logger logger = LogManager.getLogger(NettyHttpServer.class);
  3.     private static final int DEFAULT_PORT = 8998;
  4.     private int port;
  5.    
  6.     public NettyHttpServer(int port) {
  7.         this.port = port;
  8.     }
  9.     public void start() throws Exception {
  10.         logger.info("Netty Http Server is starting.");
  11.         EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
  12.         EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
  13.         try {
  14.             ServerBootstrap serverBootstrap = new ServerBootstrap();
  15.             serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
  16.             .channel(NioServerSocketChannel.class)
  17.             .childHandler(new ChannelInitializer<SocketChannel>() {
  18.                 @Override
  19.                 protected void initChannel(SocketChannel ch) throws Exception {
  20.                     ch.pipeline()
  21.                     //数据进来是自上而下,数据回去时自下而上
  22.                     .addLast("http-decoder", new HttpRequestDecoder())
  23.                     .addLast("http-aggregator", new HttpObjectAggregator(65536))
  24.                     .addLast("http-encoder", new HttpResponseEncoder())
  25.                     .addLast("http-chunked", new ChunkedWriteHandler())
  26.                     .addLast("netty-http-server-handler", new NettyHttpServerHandler());
  27.                 }
  28.             });
  29.             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  30.             logger.info("Netty Http Server is started, listened[" + port + "].");
  31.             channelFuture.channel().closeFuture().sync();
  32.         } finally {
  33.             bossEventLoopGroup.shutdownGracefully();
  34.             workerEventLoopGroup.shutdownGracefully();
  35.         }
  36.     }
  37.     public static void main(String[] args) throws Exception {
  38.         NettyHttpServer nettyHttpServer = new NettyHttpServer(DEFAULT_PORT);
  39.         nettyHttpServer.start();
  40.     }
  41. }
  42. public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
  43.     private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);
  44.     protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
  45.         logger.info(request);
  46.     }
  47. }
复制代码
启动NettyHttpServer,然后在浏览器进行访问,就可以看到输出如下:
  1. Netty Http Server is starting.
  2. Netty Http Server is started, listened[8998].
  3. HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
  4. GET / HTTP/1.1
  5. Host: localhost:8998
  6. Connection: keep-alive
  7. sec-ch-ua: ".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"
  8. sec-ch-ua-mobile: ?0
  9. sec-ch-ua-platform: "macOS"
  10. Upgrade-Insecure-Requests: 1
  11. User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36
  12. Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
  13. Sec-Fetch-Site: none
  14. Sec-Fetch-Mode: navigate
  15. Sec-Fetch-User: ?1
  16. Sec-Fetch-Dest: document
  17. Accept-Encoding: gzip, deflate, br
  18. Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,id;q=0.7,ar;q=0.6
  19. Cookie: _ga=GA1.1.629604539.1641093986
  20. content-length: 0
复制代码
(4)请求的解析处理和响应的编码处理
HTTP服务器最关键的就是请求的解析处理和响应的编码处理,比如会向ChannelPipeline中先后添加不同的解码器和编码器。
 
首先添加HTTP请求消息解码器HttpRequestDecoder,因为浏览器会把按照HTTP协议组织起来的请求数据序列化成字节数组发送给服务器,而HttpRequestDecoder可以按照HTTP协议从接收到的字节数组中读取出一个完整的请求数据。
 
然后添加HttpObjectAggregator解码器,它的作用是将多个消息转换为单一的FullHttpRequest或者FullHttpResponse。
 
接着添加HTTP响应消息编码器HttpResponseEncoder,它的作用是对HTTP响应消息进行编码。
 
以及添加ChunkedWriteHandler处理器,用来支持异步发送大的码流时也不会占用过多的内存,防止内存溢出。
 
最后添加NettyHttpServerHandler处理器,用于处理HTTP服务器的响应输出。
  1. public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
  2.     private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);
  3.     protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
  4.         String method = request.getMethod().name();
  5.         String uri = request.getUri();
  6.         logger.info("Receives Http Request: " + method + " " + uri + ".");
  7.         String html = "<html><body>Hello, I am Netty Http Server.</body></html>";
  8.         ByteBuf byteBuf = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);
  9.         FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
  10.         response.headers().set("content-type", "text/html;charset=UTF-8");
  11.         response.content().writeBytes(byteBuf);
  12.         byteBuf.release();
  13.         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
  14.     }
  15. }
复制代码
 
2.Netty实现WebSocket
(1)HTTP协议的弊端
(2)消息推送之Ajax短轮询
(3)消息推送之WebSocket
(4)WebSocket连接的建立
(5)基于WebSocket协议开发NettyServer
(6)WebSocketServer的请求数据处理逻辑开发
(7)WebSocketServer的HTTP与chunk处理分析
(8)WebSocket网页客户端代码开发与实现
 
(1)HTTP协议的弊端
一.HTTP协议为半双工通信
半双工通信指数据可以在客户端和服务端两个方向传输,但是不能同时传输。它意味着同一时刻,只能有一个方向上的数据在进行传输。
 
二.HTTP消息冗长而繁琐
HTTP消息包含消息头、消息体、换行符等,通常情况下采用文本方式传输。相比于其他的二进制通信协议,冗长而繁琐。
 
(2)消息推送之Ajax短轮询
Ajax短轮询是基于HTTP短连接的。具体就是用一个定时器由浏览器对服务器发出HTTP请求,由服务器返回数据给客户端浏览器。
 
Ajax短轮询的代码实现简单。但由于HTTP请求的Header非常冗长,里面可用数据的比例非常低,所以比较占用带宽和服务器资源,且数据同步不及时。
 
(3)消息推送之WebSocket
WebSocket本质是一个TCP连接,采用的是全双工通信。
 
WebSocket中,浏览器和服务器只需要做一个握手动作,两者之间就会形成一条快速通道进行直接数据传输。由于WebSocket基于TCP双向全双工进行消息传递,所以在同一时刻既可以发送消息,也可以接收消息,比HTTP半双工性能好。
 
WebSocket通过ping/pong帧来保持链路激活,实时性很高,但是浏览器支持度低、代码实现复杂。
 
(4)WebSocket连接的建立
建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求,这个握手请求是一个HTTP请求。该HTTP请求包含了附加头信息"Upgrade:WebSocket",表明这是一个申请协议升级的HTTP请求。
 
服务端解析这些附加的头信息,然后生成应答信息返回给客户端。这样客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传递信息,并且这个连接会持续存在直到一方主动关闭连接。
 
(5)基于WebSocket协议开发NettyServer
可以基于TCP协议用Netty来开发客户端和服务端进行相互通信,但粘包半包问题需要手动进行处理。
 
可以基于HTTP协议用Netty来开发一个HTTP服务器,服务器接收浏览器发送过来的HTTP请求后,返回HTTP响应回浏览器。
 
也可以基于WebSocket协议来开发一个Netty服务器,此时前端HTML代码会基于socket协议和Netty服务器建立长连接。这样Netty服务器就可以和浏览器里运行的HTML通过WebSocket协议建立长连接,从而使得Netty服务器可以主动推送数据给浏览器里的HTML页面。
 
WebSocket协议底层也是基于TCP协议来实现的,只不过是在TCP协议的基础上封装了一层更高层次的WebSocket协议。
  1. public class NettyWebSocketServer {
  2.     private static final Logger logger = LogManager.getLogger(NettyWebSocketServer.class);
  3.     private static final int DEFAULT_PORT = 8998;
  4.     private int port;
  5.    
  6.     public NettyWebSocketServer(int port) {
  7.         this.port = port;
  8.     }
  9.    
  10.     public void start() throws Exception {
  11.         logger.info("Netty WebSocket Server is starting.");
  12.         EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
  13.         EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
  14.         try {
  15.             ServerBootstrap serverBootstrap = new ServerBootstrap();
  16.             serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
  17.             .channel(NioServerSocketChannel.class)
  18.             .childHandler(new ChannelInitializer<SocketChannel>() {
  19.                 protected void initChannel(SocketChannel ch) throws Exception {
  20.                     ch.pipeline()
  21.                     .addLast(new HttpServerCodec())
  22.                     .addLast(new ChunkedWriteHandler())
  23.                     .addLast(new HttpObjectAggregator(1024 * 32))
  24.                     .addLast(new WebSocketServerProtocolHandler("/websocket"))
  25.                     .addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());
  26.                 }
  27.             });
  28.             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  29.             logger.info("Netty WebSocket Server server is started, listened[" + port + "].");
  30.             channelFuture.channel().closeFuture().sync();
  31.         } finally {
  32.             bossEventLoopGroup.shutdownGracefully();
  33.             workerEventLoopGroup.shutdownGracefully();
  34.         }
  35.     }
  36.    
  37.     public static void main(String[] args) throws Exception {
  38.         NettyWebSocketServer nettyHttpServer = new NettyWebSocketServer(DEFAULT_PORT);
  39.         nettyHttpServer.start();
  40.     }
  41. }
复制代码
(6)WebSocketServer的请求数据处理逻辑开发
  1. public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  2.     private static final Logger logger = LogManager.getLogger(NettyWebSocketServerHandler.class);
  3.     private static ChannelGroup webSocketClients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  4.    
  5.     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  6.         //WebSocket网页代码里发送过来的数据
  7.         String request = msg.text();
  8.         logger.info("Netty Server receives request: " + request + ".");
  9.         TextWebSocketFrame response = new TextWebSocketFrame("Hello, I am Netty Server.");
  10.         ctx.writeAndFlush(response);
  11.     }
  12.    
  13.     //如果一个网页WebSocket客户端跟Netty Server建立了连接之后,会触发这个方法
  14.     @Override
  15.     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  16.         webSocketClients.add(ctx.channel());
  17.     }
  18.    
  19.     @Override
  20.     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  21.         logger.info("websocket client is closed, channel id: " + ctx.channel().id().asLongText() + "[" + ctx.channel().id().asShortText() + "]");
  22.     }
  23. }
复制代码
(7)WebSocketServer的HTTP与chunk处理分析
浏览器里面运行的是HTML网页代码,WebSocket就是在HTML网页代码里嵌入WebSocket代码,可以让浏览器里的HTML网页代码跟NettyServer建立连接,并且是基于长连接发送数据。
 
浏览器会按照WebSocket协议(底层基于HTTP协议)组织请求数据格式,然后把数据序列化成字节数组,接着通过网络连接把字节数组传输发送给NettyServer,NettyServer会通过数据处理链条来进行处理接收到的字节数组数据。
  1. protected void initChannel(SocketChannel ch) throws Exception {
  2.     ch.pipeline()
  3.     //浏览器的字节数组数据进来以后,字节数组数据先用http协议来处理,把字节数组转换为一个HttpRequest请求对象
  4.     //最后数据返回给浏览器前,又会对HttpResponse对象进行编码成字节数组
  5.     .addLast(new HttpServerCodec())
  6.     //chunked write用于大量数据流时的分chunk块,也就是数据实在是太大了就必须得分chunk
  7.     //大量数据流进来时,可以分chunk块来读;大量数据流出去时,可以分chunk块来写;
  8.     .addLast(new ChunkedWriteHandler())
  9.     //如果想要让很多http不要拆分成很多段过来,可以把完整的请求数据聚合到一起再给过来
  10.     .addLast(new HttpObjectAggregator(1024 * 32))
  11.     //基于前面已经转换好的请求数据对象,会在这里基于WebSocket协议再次做一个处理
  12.     //由于传输时是基于http协议传输过来的,而封装的内容是按webSocket协议来封装的http请求数据
  13.     //所以必须在这里提取http请求里面的数据,然后按照WebSocket协议来进行解析处理,把数据提取出来作为WebSocket的数据片段
  14.     .addLast(new WebSocketServerProtocolHandler("/websocket"))
  15.     //响应数据输出时,顺序是反的,第一步原始数据必须先经过WebSocket协议转换
  16.     //WebSocket协议数据,必须经过HTTP协议处理,但最终会encode编码成一个HTTP协议的响应数据
  17.     //然后服务端将HTTP响应数据序列化的字节数组,传输给浏览器
  18.     //浏览器拿到字节数组后进行反序列化,拿到一个HTTP协议响应数据,提取出内容再按照WebSocket协议来处理
  19.     //最终把普通的数据给WebSocket代码
  20.     .addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());
  21. }
复制代码
(8)WebSocket网页客户端代码开发与实现
  1. <!DOCTYPE html>
  2. <html lang="en">
  3.     <head>
  4.         <meta http-equiv="content-type" content="text/html; charset=utf-8" />
  5.         <title>websocket网页</title>
  6.     </head>
  7.     <body onload="connectServer();">
  8.         
  9.     </body>
  10. </html>
复制代码
启动Netty Server,然后打开该HTML即可看到console的输出。
 
WebSocket和Netty Server配合起来开发说明:
如果Server端要主动推送一些通知(push)给网页端正在浏览网页的用户,如推送用户可能感兴趣的商品、关注的新闻。那么在用户进入网页后可以询问用户,是否愿意收到服务端发送的xx提示和通知。如果用户愿意,那么网页里的WebSocket完全可以跟NettyServer构建一个长连接。这样NettyServer在必要时,就可以反向推送通知(push)给用户,浏览器里的网页可能会弹出一个push通知用户xx讯息。
 
3.Netty实现的消息推送系统
(1)基于WebSocket的消息推送系统说明
(2)消息推送系统的PushServer
(3)消息推送系统的连接管理封装
(4)消息推送系统的ping-pong探测
(5)消息推送系统的全连接推送
(6)消息推送系统的HTTP响应和握手
(7)消息推送系统的运营客户端
(8)运营客户端连接PushServer
(9)运营客户端的Handler处理器
(10)运营客户端发送推送消息
(11)浏览器客户端接收推送消息
 
(1)基于WebSocket的消息推送系统说明
首先需要一个运营系统能够基于NettyClient和PushServer建立WebSocket长连接,然后浏览器客户端也要和PushServer建立好WebSocket长连接,接着运营系统会让NettyClient发送Push推送消息给PushServer,最后PushServer再把推送消息发送给浏览器客户端。
 
首先启动PushServer,然后打开多个网页客户端查看console,接着启动运营客系统在控制台输入消息,这样就可以完成一个完整的消息推送的交互了。
 
(2)消息推送系统的PushServer
  1. public class NettyPushServer {
  2.     private static final Logger logger = LogManager.getLogger(NettyPushServer.class);
  3.     private static final int DEFAULT_PORT = 8998;
  4.     private int port;
  5.    
  6.     public NettyPushServer(int port) {
  7.         this.port = port;
  8.     }
  9.    
  10.     public void start() throws Exception {
  11.         logger.info("Netty Push Server is starting.");
  12.         EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
  13.         EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
  14.         try {
  15.             ServerBootstrap serverBootstrap = new ServerBootstrap();
  16.             serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
  17.             .channel(NioServerSocketChannel.class)
  18.             .childHandler(new ChannelInitializer<SocketChannel>() {
  19.                 protected void initChannel(SocketChannel ch) throws Exception {
  20.                     ch.pipeline()
  21.                     .addLast("logging", new LoggingHandler("DEBUG"))
  22.                     .addLast("http-codec", new HttpServerCodec())
  23.                     .addLast("aggregator", new HttpObjectAggregator(65536))
  24.                     .addLast("http-chunked", new ChunkedWriteHandler())
  25.                     .addLast("netty-push-server-handler", new NettyPushServerHandler());
  26.                 }
  27.             });
  28.             ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  29.             logger.info("Netty Push Server is started, listened[" + port + "].");
  30.             channelFuture.channel().closeFuture().sync();
  31.         } finally {
  32.             bossEventLoopGroup.shutdownGracefully();
  33.             workerEventLoopGroup.shutdownGracefully();
  34.         }
  35.     }
  36.     public static void main(String[] args) throws Exception {
  37.         NettyPushServer nettyHttpServer = new NettyPushServer(DEFAULT_PORT);
  38.         nettyHttpServer.start();
  39.     }
  40. }
  41. public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {
  42.     private static final Logger logger = LogManager.getLogger(NettyPushServerHandler.class);
  43.    
  44.     @Override
  45.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  46.         logger.info("Client Connection Established: " + ctx.channel());
  47.     }
  48.    
  49.     @Override
  50.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  51.         logger.info("Client Disconnected: " + ctx.channel());
  52.     }
  53.    
  54.     @Override
  55.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  56.         if (msg instanceof FullHttpRequest) {
  57.             handleHttpRequest(ctx, (FullHttpRequest) msg);
  58.         } else if(msg instanceof WebSocketFrame) {
  59.             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
  60.         }
  61.     }
  62.    
  63.     @Override
  64.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  65.         ctx.flush();
  66.     }
  67.    
  68.     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {
  69.     }
  70.    
  71.     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
  72.     }
  73. }
复制代码
(3)消息推送系统的连接管理封装
  1. //用来管理连接
  2. public class ChannelManager {
  3.     private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  4.     private static ConcurrentHashMap<String, ChannelId> channelIds = new ConcurrentHashMap<String, ChannelId>();
  5.    
  6.     public static void add(Channel channel) {
  7.         channelGroup.add(channel);
  8.         channelIds.put(channel.id().asShortText(), channel.id());
  9.     }
  10.    
  11.     public static void remove(Channel channel) {
  12.         channelGroup.remove(channel);
  13.         channelIds.remove(channel.id().asShortText());
  14.     }
  15.    
  16.     public static Channel get(String id) {
  17.         return channelGroup.find(channelIds.get(id));
  18.     }
  19.    
  20.     public static void pushToAllChannels(TextWebSocketFrame webSocketFrame) {
  21.         channelGroup.writeAndFlush(webSocketFrame);
  22.     }
  23. }
  24. public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {
  25.     ...
  26.     @Override
  27.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  28.         logger.info("Client Connection Established: " + ctx.channel());
  29.         ChannelManager.add(ctx.channel());
  30.     }
  31.     @Override
  32.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  33.         logger.info("Client Disconnected: " + ctx.channel());
  34.         ChannelManager.remove(ctx.channel());
  35.     }
  36.     ...
  37. }
复制代码
(4)消息推送系统的ping-pong探测
  1. public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {
  2.     ...
  3.     private WebSocketServerHandshaker webSocketServerHandshaker;
  4.     ...
  5.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  6.         if (msg instanceof FullHttpRequest) {
  7.             handleHttpRequest(ctx, (FullHttpRequest) msg);
  8.         } else if(msg instanceof WebSocketFrame) {
  9.             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
  10.         }
  11.     }
  12.    
  13.     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {
  14.         //WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效
  15.         if (webSocketFrame instanceof PingWebSocketFrame) {
  16.             logger.info("Receive ping frame from client: " + ctx.channel());
  17.             WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());
  18.             ctx.channel().write(pongWebSocketFrame);
  19.             return;
  20.         }
  21.         //WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接
  22.         if (webSocketFrame instanceof CloseWebSocketFrame) {
  23.             logger.info("Receive close WebSocket request from client: " + ctx.channel());
  24.             webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());
  25.             return;
  26.         }
  27.         ...
  28.     }
  29.     ...
  30. }
复制代码
(5)消息推送系统的全连接推送
  1. public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {
  2.     ...
  3.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  4.         if (msg instanceof FullHttpRequest) {
  5.             handleHttpRequest(ctx, (FullHttpRequest) msg);
  6.         } else if(msg instanceof WebSocketFrame) {
  7.             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
  8.         }
  9.     }
  10.    
  11.     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {
  12.         //WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效
  13.         if (webSocketFrame instanceof PingWebSocketFrame) {
  14.             logger.info("Receive ping frame from client: " + ctx.channel());
  15.             WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());
  16.             ctx.channel().write(pongWebSocketFrame);
  17.             return;
  18.         }
  19.         
  20.         //WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接
  21.         if (webSocketFrame instanceof CloseWebSocketFrame) {
  22.             logger.info("Receive close WebSocket request from client: " + ctx.channel());
  23.             webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());
  24.             return;
  25.         }
  26.         
  27.         //WebSocket网页客户端发送请求,但它不是text文本请求
  28.         if (!(webSocketFrame instanceof TextWebSocketFrame)) {
  29.             logger.error("Netty Push Server only support text frame, does not support other type frame.");
  30.             String errorMsg = String.format("%s type frame is not supported.", webSocketFrame.getClass().getName());
  31.             throw new UnsupportedOperationException(errorMsg);
  32.         }
  33.         
  34.         //WebSocket网页客户端发送一个文本请求过来,是TextFrame类型的
  35.         String request = ((TextWebSocketFrame)webSocketFrame).text();
  36.         logger.info("Receive text frame[" + request + "] from client: " + ctx.channel());
  37.       
  38.         //构建响应
  39.         TextWebSocketFrame response = new TextWebSocketFrame(request);
  40.         //发送给所有连接,全连接推送
  41.         ChannelManager.pushToAllChannels(response);
  42.     }
  43.     ...
  44. }
复制代码
(6)消息推送系统的HTTP响应和握手
  1. public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {
  2.     ...
  3.     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
  4.         if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
  5.             DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
  6.             sendHttpResponse(ctx, request, response);
  7.             return;
  8.         }
  9.         logger.info("Receive handshake request from client: " + ctx.channel());
  10.         
  11.         //握手建立
  12.         WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8998/push", null, false);
  13.         webSocketServerHandshaker = factory.newHandshaker(request);
  14.         if (webSocketServerHandshaker == null) {
  15.             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
  16.         } else {
  17.             webSocketServerHandshaker.handshake(ctx.channel(), request);
  18.             logger.info("Netty push server handshake with client: " + ctx.channel());
  19.         }
  20.     }
  21.    
  22.     //HTTP响应
  23.     private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {
  24.         if (response.status().code() != RESPONSE_CODE_OK) {
  25.             ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
  26.             response.content().writeBytes(byteBuf);
  27.             logger.info("Http Response is not ok: " + byteBuf.toString(CharsetUtil.UTF_8));
  28.             byteBuf.release();
  29.         }
  30.         ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);
  31.         if (response.status().code() != RESPONSE_CODE_OK) {
  32.             channelFuture.addListener(ChannelFutureListener.CLOSE);
  33.         }
  34.     }
  35.     ...
  36. }
复制代码
(7)消息推送系统的运营客户端
  1. public class OperationNettyClient {
  2.     private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);
  3.     private static final String WEB_SOCKET_SCHEME = "ws";
  4.     private static final String WSS_SCHEME = "wss";
  5.     private static final String LOCAL_HOST = "127.0.0.1";
  6.     private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");
  7.   
  8.     private static URI uri;
  9.     private static String scheme;
  10.     private static String host;
  11.     private static int port;
  12.     private static SslContext sslContext;
  13.     private EventLoopGroup eventLoopGroup;
  14.     public void start() throws Exception {
  15.         //...
  16.     }
  17.     public static void main(String[] args) throws Exception {
  18.         uri = new URI(PUSH_SERVER_URI);
  19.         scheme = getScheme(uri);
  20.         host = getHost(uri);
  21.         port = getPort(uri, scheme);
  22.         checkScheme(scheme);
  23.         initSslContext(scheme);
  24.     }
  25.    
  26.     private static String getScheme(URI pushServerUri) {
  27.         return pushServerUri.getScheme() == null ? WEB_SOCKET_SCHEME : pushServerUri.getScheme();
  28.     }
  29.    
  30.     private static String getHost(URI pushServerUri) {
  31.         return pushServerUri.getHost() == null ? LOCAL_HOST : pushServerUri.getHost();
  32.     }
  33.    
  34.     private static int getPort(URI pushServerUri, String scheme) {
  35.         int port;
  36.         if (pushServerUri.getPort() == -1) {
  37.             if (WEB_SOCKET_SCHEME.equals(scheme)) {
  38.                 port = 80;
  39.             } else if(WSS_SCHEME.equals(scheme)) {
  40.                 port = 443;
  41.             } else {
  42.                 port = -1;
  43.             }
  44.         } else {
  45.             port = pushServerUri.getPort();
  46.         }
  47.         return port;
  48.     }
  49.    
  50.     //检查scheme是否是ws或wss
  51.     private static void checkScheme(String scheme) {
  52.         if (!WEB_SOCKET_SCHEME.equals(scheme) && !WSS_SCHEME.equals(scheme)) {
  53.             logger.error("Only Support ws or wss scheme.");
  54.             throw new RuntimeException("Only Support ws or wss scheme.");
  55.         }
  56.     }
  57.    
  58.     //如果WebSocket使用了SSL,也就是wss,那么初始化对应的sslContext
  59.     private static void initSslContext(String scheme) throws Exception {
  60.         boolean enableSSL = WSS_SCHEME.equals(scheme);
  61.         if (enableSSL) {
  62.             sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
  63.         } else {
  64.             sslContext = null;
  65.         }
  66.     }
  67. }
复制代码
(8)运营客户端连接PushServer
  1. public class OperationNettyClient {
  2.     private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);
  3.     private static final String WEB_SOCKET_SCHEME = "ws";
  4.     private static final String WSS_SCHEME = "wss";
  5.     private static final String LOCAL_HOST = "127.0.0.1";
  6.     private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");
  7.     private static final String INPUT_MESSAGE_QUIT = "quit";
  8.     private static final String INPUT_MESSAGE_CLOSE = "close";
  9.     private static final String INPUT_MESSAGE_PING = "ping";
  10.     private static URI uri;
  11.     private static String scheme;
  12.     private static String host;
  13.     private static int port;
  14.     private static SslContext sslContext;
  15.     private EventLoopGroup eventLoopGroup;
  16.     public Channel start() throws Exception {
  17.         logger.info("Operation Netty Client is connecting.");
  18.         eventLoopGroup = new NioEventLoopGroup();
  19.         WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
  20.         final OperationNettyClientHandler operationNettyClientHandler = new OperationNettyClientHandler(webSocketClientHandshaker);
  21.         Bootstrap bootstrap = new Bootstrap();
  22.         bootstrap.group(eventLoopGroup)
  23.         .channel(NioSocketChannel.class)
  24.         .handler(new ChannelInitializer<SocketChannel>() {
  25.             protected void initChannel(SocketChannel ch) throws Exception {
  26.                 ChannelPipeline channelPipeline = ch.pipeline();
  27.                 if (sslContext != null) {
  28.                     channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));
  29.                 }
  30.                 channelPipeline.addLast(new HttpClientCodec())
  31.                 .addLast(new HttpObjectAggregator(65536))
  32.                 .addLast(WebSocketClientCompressionHandler.INSTANCE)
  33.                 .addLast(operationNettyClientHandler);
  34.             }
  35.         });
  36.         Channel channel = bootstrap.connect(uri.getHost(), port).sync().channel();
  37.         logger.info("Operation Netty Client connected to push server.");
  38.         operationNettyClientHandler.channelFuture().sync();
  39.         return channel;
  40.     }
  41.    
  42.     public void shutdownGracefully() {
  43.         eventLoopGroup.shutdownGracefully();
  44.     }
  45.    
  46.     public static void main(String[] args) throws Exception {
  47.         uri = new URI(PUSH_SERVER_URI);
  48.         scheme = getScheme(uri);
  49.         host = getHost(uri);
  50.         port = getPort(uri, scheme);
  51.         checkScheme(scheme);
  52.         initSslContext(scheme);
  53.         OperationNettyClient operationNettyClient = new OperationNettyClient();
  54.         try {
  55.             Channel channel = operationNettyClient.start();
  56.         } finally {
  57.             operationNettyClient.shutdownGracefully();
  58.         }
  59.     }
  60.     ...
  61. }
复制代码
(9)运营客户端的Handler处理器
  1. public class OperationNettyClientHandler extends SimpleChannelInboundHandler<Object> {
  2.     private static final Logger logger = LogManager.getLogger(OperationNettyClientHandler.class);
  3.     private WebSocketClientHandshaker webSocketClientHandshaker;
  4.     private ChannelFuture channelFuture;
  5.     public OperationNettyClientHandler(WebSocketClientHandshaker webSocketClientHandshaker) {
  6.         this.webSocketClientHandshaker = webSocketClientHandshaker;
  7.     }
  8.    
  9.     public ChannelFuture channelFuture() {
  10.         return channelFuture;
  11.     }
  12.    
  13.     @Override
  14.     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  15.         channelFuture = ctx.newPromise();
  16.     }
  17.    
  18.     @Override
  19.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  20.         webSocketClientHandshaker.handshake(ctx.channel());
  21.         logger.info("Operation Netty Client send WebSocket handshake request.");
  22.     }
  23.    
  24.     @Override
  25.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  26.         logger.info("netty client disconnected.");
  27.     }
  28.    
  29.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  30.         Channel channel = ctx.channel();
  31.         if (!webSocketClientHandshaker.isHandshakeComplete()) {
  32.             try {
  33.                 webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);
  34.                 logger.info("Netty Client connected.");
  35.                 ((ChannelPromise)channelFuture).setSuccess();
  36.             } catch(WebSocketHandshakeException e) {
  37.                 logger.error("WebSocket handshake failed.", e);
  38.                 ((ChannelPromise)channelFuture).setFailure(e);
  39.             }
  40.             return;
  41.         }
  42.         if (msg instanceof FullHttpResponse) {  
  43.             FullHttpResponse response = (FullHttpResponse) msg;
  44.             throw new IllegalStateException("Not Supported HTTP Response.");
  45.         }
  46.         WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
  47.         if (webSocketFrame instanceof TextWebSocketFrame) {
  48.             TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;
  49.             logger.info("Receives text frame: " + textWebSocketFrame.text());
  50.         } else if(webSocketFrame instanceof PongWebSocketFrame) {
  51.             logger.info("Receives pong frame: " + webSocketFrame);
  52.         } else if(webSocketFrame instanceof CloseWebSocketFrame) {
  53.             logger.info("Receives close WebSocket frame, Netty Client is closing.");
  54.             channel.close();
  55.         }
  56.     }
  57.     @Override
  58.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  59.         logger.error("Operation Netty client handler exception caught.", cause);
  60.         if (!channelFuture.isDone()) {
  61.             ((ChannelPromise)channelFuture).setFailure(cause);
  62.         }
  63.         ctx.close();
  64.     }
  65. }
复制代码
(10)运营客户端发送推送消息
  1. public class OperationNettyClient {
  2.     ...
  3.     public void waitInputMessage(Channel channel) throws Exception {
  4.         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
  5.         while(true) {
  6.             logger.info("Wait for input message.");
  7.             String message = bufferedReader.readLine();
  8.             if (INPUT_MESSAGE_QUIT.equals(message)) {
  9.                 break;
  10.             } else if(INPUT_MESSAGE_CLOSE.equals(message)) {
  11.                 channel.writeAndFlush(new CloseWebSocketFrame());
  12.                 channel.closeFuture().sync();
  13.                 break;
  14.             } else if(INPUT_MESSAGE_PING.equals(message)) {
  15.                 WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));
  16.                 channel.writeAndFlush(webSocketFrame);
  17.             } else {
  18.                 WebSocketFrame webSocketFrame = new TextWebSocketFrame(message);
  19.                 channel.writeAndFlush(webSocketFrame);
  20.             }
  21.         }
  22.     }
  23.    
  24.     public static void main(String[] args) throws Exception {
  25.         uri = new URI(PUSH_SERVER_URI);
  26.         scheme = getScheme(uri);
  27.         host = getHost(uri);
  28.         port = getPort(uri, scheme);
  29.    
  30.         checkScheme(scheme);
  31.         initSslContext(scheme);
  32.    
  33.         OperationNettyClient operationNettyClient = new OperationNettyClient();
  34.         try {
  35.             Channel channel = operationNettyClient.start();
  36.             //运营客户端发送消息入口
  37.             operationNettyClient.waitInputMessage(channel);
  38.         } finally {
  39.             operationNettyClient.shutdownGracefully();
  40.         }
  41.     }
  42.     ...
  43. }
复制代码
(11)浏览器客户端接收推送消息
  1. <!DOCTYPE html>
  2. <html lang="en">
  3.     <head>
  4.         <meta http-equiv="content-type" content="text/html; charset=utf-8" />
  5.         <title>websocket网页</title>
  6.     </head>
  7.     <body onload="connectServer();">
  8.         
  9.     </body>
  10. </html>
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册