找回密码
 立即注册
首页 业界区 业界 Netty 心跳机制实现(客户端与服务端)

Netty 心跳机制实现(客户端与服务端)

村亢 前天 22:17
Netty 心跳机制实现(客户端与服务端)

Netty 的心跳机制是保持长连接有效性的重要手段,可以检测连接是否存活并及时释放无效连接。下面介绍客户端和服务端的完整实现方案。
一、服务端实现

1. 基础心跳检测
  1. public class HeartbeatServerInitializer extends ChannelInitializer<SocketChannel> {
  2.     @Override
  3.     protected void initChannel(SocketChannel ch) throws Exception {
  4.         ChannelPipeline pipeline = ch.pipeline();
  5.         // 添加编解码器
  6.         pipeline.addLast(new StringDecoder());
  7.         pipeline.addLast(new StringEncoder());
  8.         // 心跳检测
  9.         // 参数说明:readerIdleTime, writerIdleTime, allIdleTime, 时间单位
  10.         pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
  11.         pipeline.addLast(new HeartbeatServerHandler());
  12.     }
  13. }
  14. public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
  15.     // 心跳丢失计数器 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测
  16.     //private Map<String, Integer> lossConnectMap = new HashMap<>();
  17.     @Override
  18.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  19.         if (evt instanceof IdleStateEvent) {
  20.             IdleStateEvent event = (IdleStateEvent) evt;
  21.             if (event.state() == IdleState.READER_IDLE) {
  22.                 ctx.channel().close();
  23.                 //IdleStateHandler 会检测空闲情况,所以直接关闭,不需要再弄计数器,
  24.                 //String socketAddress = ctx.channel().remoteAddress().toString();
  25.                 //int lossConnectCount = 0;
  26.                 //if (lossConnectMap.containsKey(socketAddress)) {
  27.                 //    lossConnectCount = lossConnectMap.get(socketAddress);
  28.                 //}
  29.                 //lossConnectCount++;
  30.                 //lossConnectMap.put(socketAddress, lossConnectCount);
  31.                 //logger.info("关闭不活跃: " + ctx.channel().remoteAddress() + " " + lossConnectCount);
  32.                 //if (lossConnectCount > 2) {
  33.                 //    logger.info("关闭不活跃连接: " + ctx.channel());
  34.                 //    ctx.channel().close();
  35.                 //}
  36.             }
  37.         } else {
  38.             super.userEventTriggered(ctx, evt);
  39.         }
  40.     }
  41.    
  42.     @Override
  43.     public void channelRead(ChannelHandlerContext ctx, Object msg) {
  44.         //Netty 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测
  45.         // 收到任何消息都重置计数器
  46.         //if ("HEARTBEAT".equals(msg)) {
  47.         //    if (lossConnectMap.containsKey(socketAddress)) {
  48.         //        lossConnectMap.put(socketAddress, 0);
  49.         //    }
  50.         //    System.out.println("收到心跳: " + ctx.channel());
  51.         //    ctx.writeAndFlush("HEARTBEAT_RESPONSE");
  52.         //} else {
  53.         //    // 处理其他业务消息
  54.         //}
  55.     }
  56. }
复制代码
2. 完整心跳交互方案
  1. public class AdvancedHeartbeatServerHandler extends ChannelInboundHandlerAdapter {
  2.     private static final ByteBuf HEARTBEAT_SEQUENCE =
  3.         Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
  4.    
  5.     @Override
  6.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  7.         if (evt instanceof IdleStateEvent) {
  8.             IdleState state = ((IdleStateEvent) evt).state();
  9.             if (state == IdleState.READER_IDLE) {
  10.                 // 读空闲(没有收到客户端消息)
  11.                 System.out.println("读空闲,关闭连接: " + ctx.channel());
  12.                 ctx.close();
  13.             } else if (state == IdleState.WRITER_IDLE) {
  14.                 // 写空闲(可以主动发送心跳包)
  15.                 System.out.println("写空闲,发送心跳包");
  16.                 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
  17.                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  18.             }
  19.         } else {
  20.             super.userEventTriggered(ctx, evt);
  21.         }
  22.     }
  23.     @Override
  24.     public void channelRead(ChannelHandlerContext ctx, Object msg) {
  25.         String message = (String) msg;
  26.         //Netty 不需要单独做心跳机制,它可以通过IdleStateHandler进行检测
  27.         //if ("HEARTBEAT_REQUEST".equals(message)) {
  28.         //    // 响应客户端心跳
  29.         //    ctx.writeAndFlush("HEARTBEAT_RESPONSE");
  30.         //} else {
  31.         //    // 处理业务消息
  32.         //}
  33.     }
  34. }
复制代码
二、客户端实现

1. 基础心跳实现
  1. public class HeartbeatClientInitializer extends ChannelInitializer<SocketChannel> {
  2.     @Override
  3.     protected void initChannel(SocketChannel ch) throws Exception {
  4.         ChannelPipeline pipeline = ch.pipeline();
  5.         
  6.         pipeline.addLast(new StringDecoder());
  7.         pipeline.addLast(new StringEncoder());
  8.         
  9.         // 客户端设置写空闲检测(定期发送心跳)
  10.         pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
  11.         pipeline.addLast(new HeartbeatClientHandler());
  12.     }
  13. }
  14. public class HeartbeatClientHandler extends ChannelInboundHandlerAdapter {
  15.     @Override
  16.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  17.         if (evt instanceof IdleStateEvent) {
  18.             IdleStateEvent event = (IdleStateEvent) evt;
  19.             if (event.state() == IdleState.WRITER_IDLE) {
  20.                 // 写空闲时发送心跳
  21.                 ctx.writeAndFlush("HEARTBEAT");
  22.                 System.out.println("客户端发送心跳");
  23.             }
  24.         }
  25.     }
  26.    
  27.     @Override
  28.     public void channelRead(ChannelHandlerContext ctx, Object msg) {
  29.         if ("HEARTBEAT_RESPONSE".equals(msg)) {
  30.             System.out.println("收到服务端心跳响应");
  31.         }
  32.     }
  33. }
复制代码
2. 完整心跳交互方案
  1. public class AdvancedHeartbeatClientHandler extends ChannelInboundHandlerAdapter {
  2.     private static final ByteBuf HEARTBEAT_SEQUENCE =
  3.         Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT_REQUEST", CharsetUtil.UTF_8));
  4.    
  5.     @Override
  6.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  7.         // 连接建立后立即发送一次心跳
  8.         sendHeartbeat(ctx);
  9.         super.channelActive(ctx);
  10.     }
  11.    
  12.     @Override
  13.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  14.         if (evt instanceof IdleStateEvent) {
  15.             IdleState state = ((IdleStateEvent) evt).state();
  16.             if (state == IdleState.WRITER_IDLE) {
  17.                 // 写空闲时发送心跳
  18.                 sendHeartbeat(ctx);
  19.             } else if (state == IdleState.READER_IDLE) {
  20.                 // 读空闲(未收到服务端响应)
  21.                 System.out.println("服务端无响应,关闭连接");
  22.                 ctx.close();
  23.             }
  24.         } else {
  25.             super.userEventTriggered(ctx, evt);
  26.         }
  27.     }
  28.    
  29.     private void sendHeartbeat(ChannelHandlerContext ctx) {
  30.         ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
  31.            .addListener(future -> {
  32.                if (!future.isSuccess()) {
  33.                    System.err.println("心跳发送失败: " + future.cause());
  34.                }
  35.            });
  36.     }
  37.    
  38.     @Override
  39.     public void channelRead(ChannelHandlerContext ctx, Object msg) {
  40.         String message = (String) msg;
  41.         if ("HEARTBEAT".equals(message)) {
  42.             // 响应服务端心跳
  43.             ctx.writeAndFlush("HEARTBEAT_RESPONSE");
  44.         } else if ("HEARTBEAT_RESPONSE".equals(message)) {
  45.             // 收到服务端对客户端心跳的响应
  46.             System.out.println("心跳正常");
  47.         }
  48.     }
  49. }
复制代码
三、WebSocket 心跳实现

对于 WebSocket 连接,心跳机制需要特殊处理:
服务端实现
  1. public class WebSocketHeartbeatServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  2.     @Override
  3.     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  4.         String text = msg.text();
  5.         if ("HEARTBEAT".equals(text)) {
  6.             ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT_RESPONSE"));
  7.         } else {
  8.             // 处理其他WebSocket消息
  9.         }
  10.     }
  11.    
  12.     @Override
  13.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  14.         if (evt instanceof IdleStateEvent) {
  15.             IdleStateEvent idleEvent = (IdleStateEvent) evt;
  16.             if (idleEvent.state() == IdleState.READER_IDLE) {
  17.                 ctx.close();
  18.             } else if (idleEvent.state() == IdleState.WRITER_IDLE) {
  19.                 ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT"));
  20.             }
  21.         }
  22.     }
  23. }
复制代码
客户端实现
  1. public class WebSocketHeartbeatClientHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  2.     @Override
  3.     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  4.         String text = msg.text();
  5.         if ("HEARTBEAT".equals(text)) {
  6.             ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT_RESPONSE"));
  7.         }
  8.     }
  9.    
  10.     @Override
  11.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  12.         if (evt instanceof IdleStateEvent) {
  13.             IdleStateEvent event = (IdleStateEvent) evt;
  14.             if (event.state() == IdleState.WRITER_IDLE) {
  15.                 ctx.writeAndFlush(new TextWebSocketFrame("HEARTBEAT"));
  16.             }
  17.         }
  18.     }
  19. }
复制代码
四、最佳实践建议


  • 合理设置超时时间

    • 生产环境建议读空闲时间设置为60-120秒
    • 写空闲时间设置为30-60秒

  • 心跳协议设计

    • 使用固定格式的心跳消息(如"HEARTBEAT")
    • 可以考虑携带时间戳或序列号用于调试

  • 重连机制
    1. // 客户端重连示例
    2. public class ReconnectHandler extends ChannelInboundHandlerAdapter {
    3.     private final Bootstrap bootstrap;
    4.     private int retries = 0;
    5.    
    6.     @Override
    7.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    8.         if (retries < 3) {
    9.             long delay = 1L << retries;
    10.             retries++;
    11.             ctx.channel().eventLoop().schedule(() -> {
    12.                 System.out.println("尝试重连..." + retries);
    13.                 bootstrap.connect();
    14.             }, delay, TimeUnit.SECONDS);
    15.         }
    16.         ctx.fireChannelInactive();
    17.     }
    18. }
    复制代码
  • 监控与日志

    • 记录心跳异常情况
    • 监控连接存活率

  • 性能考虑

    • 使用共享的ByteBuf作为心跳消息
    • 避免在心跳处理器中执行耗时操作

通过以上实现,可以构建健壮的Netty心跳机制,有效维护长连接的可靠性。
Netty 心跳机制中写空闲检测的考量

在 Netty 心跳机制中,写空闲(WRITER_IDLE)检测和读空闲(READER_IDLE)检测各有不同的应用场景和考量因素。是否需要同时使用两者取决于具体业务需求。
一、写空闲检测的主要考虑场景


  • 客户端主动保活(最常见场景)

    • 当客户端需要维持与服务端的连接时(如移动设备通过NAT网关连接)
    • 防止中间设备(路由器、防火墙等)因长时间无数据流动而断开连接
    • 典型实现:客户端定期发送心跳包

  • 服务端主动检测(特殊场景)

    • 当服务端需要确认客户端是否存活但客户端无法主动发送心跳时
    • 双向心跳检测机制中
    • 需要服务端主动推送数据的场景(如实时监控系统)

  • 对称性心跳设计

    • 在金融、支付等对可靠性要求高的系统中
    • 双方向都保持活跃检测,提高连接可靠性

二、是否只需要读空闲检测?

可以仅使用读空闲检测的场景:


  • 纯服务端检测模式

    • 客户端会定期发送数据(包括业务数据和心跳)
    • 服务端只需要检测是否在指定时间内收到任何数据

  • 客户端可靠主动发送心跳

    • 客户端能保证按时发送心跳包
    • 网络环境稳定(如内网通信)

  • 节省资源考虑

    • 减少不必要的写操作
    • 简化心跳逻辑

需要同时使用写空闲检测的场景:


  • NAT环境下的长连接
    1. // 典型NAT环境下的客户端配置
    2. pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); // 只检测写空闲
    复制代码
  • 需要服务端主动保活的系统
    1. // 服务端需要保持连接活跃
    2. pipeline.addLast(new IdleStateHandler(60, 30, 0, TimeUnit.SECONDS)); // 读写都检测
    复制代码
  • 双向心跳验证
    1. // 高可靠性系统的心跳设计
    2. // 服务端:
    3. pipeline.addLast(new IdleStateHandler(60, 45, 0, TimeUnit.SECONDS));
    4. // 客户端:
    5. pipeline.addLast(new IdleStateHandler(75, 30, 0, TimeUnit.SECONDS));
    复制代码
三、实际应用建议

推荐方案1:客户端单边心跳(最常见)
  1. // 客户端配置
  2. pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); // 只检测写空闲
  3. pipeline.addLast(new HeartbeatClientHandler());
  4. // 服务端配置
  5. pipeline.addLast(new IdleStateHandler(90, 0, 0, TimeUnit.SECONDS)); // 只检测读空闲
复制代码
适用场景:大多数移动应用、WebSocket通信等
优点

  • 客户端主动保活,避免NAT超时
  • 服务端只需检测客户端是否存活
  • 实现简单
推荐方案2:双向心跳检测
  1. // 服务端配置
  2. pipeline.addLast(new IdleStateHandler(60, 45, 0, TimeUnit.SECONDS));
  3. // 客户端配置
  4. pipeline.addLast(new IdleStateHandler(75, 30, 0, TimeUnit.SECONDS));
复制代码
适用场景

  • 金融支付系统
  • 物联网关键设备通信
  • 对连接可靠性要求极高的场景
优点

  • 双方向连接状态确认
  • 更高的可靠性
  • 能更快发现单向网络中断情况
推荐方案3:自适应心跳
  1. // 可根据网络条件动态调整
  2. public class AdaptiveIdleStateHandler extends IdleStateHandler {
  3.     private boolean isMobileNetwork;
  4.    
  5.     public AdaptiveIdleStateHandler() {
  6.         super(60, 30, 0, TimeUnit.SECONDS);
  7.     }
  8.    
  9.     @Override
  10.     protected long nextDelay(IdleState state) {
  11.         if (isMobileNetwork && state == IdleState.WRITER_IDLE) {
  12.             return 25; // 移动网络下更频繁发送
  13.         }
  14.         return super.nextDelay(state);
  15.     }
  16. }
复制代码
四、关键决策因素


  • 网络环境

    • 公网/NAT环境:需要写空闲检测
    • 内网环境:可能只需读空闲检测

  • 客户端类型

    • 移动设备:需要主动保活(写空闲)
    • 服务端:通常只需检测客户端是否存活(读空闲)

  • 业务需求

    • 普通消息推送:单边检测足够
    • 金融交易:建议双向检测

  • 资源消耗

    • 写空闲检测会增加少量网络流量
    • 读空闲检测不会产生额外流量

五、典型案例

案例1:IM即时通讯系统
  1. // 客户端(移动设备)
  2. pipeline.addLast(new IdleStateHandler(0, 25, 0, TimeUnit.SECONDS)); // 只写空闲
  3. // 服务端
  4. pipeline.addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)); // 只读空闲
复制代码
理由:移动设备需要保持NAT映射,服务端只需确认客户端是否在线
案例2:物联网数据采集
  1. // 设备端(客户端)
  2. pipeline.addLast(new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS));
  3. // 服务端
  4. pipeline.addLast(new IdleStateHandler(180, 120, 0, TimeUnit.SECONDS));
复制代码
理由:设备可能处于不稳定网络环境,需要双方向检测
总结

是否需要写空闲检测取决于具体场景:

  • 大多数情况下:客户端需要写空闲检测(主动保活),服务端只需读空闲检测
  • 高可靠性系统:建议使用双向检测
  • 内网稳定环境:可能只需读空闲检测
最佳实践是根据实际网络条件和业务需求,选择适当的组合方式。对于公网应用,特别是移动端,写空闲检测通常是必要的。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册