之前分享了基于Netty的Socket相关的内容,但是忽略了其中的一个细节。服务端和客户端如果断开连接怎么办,或者如何保证客户端和服务端一直保持连接。
当然是心跳检测。这种技术我们并不陌生,类似Nacos、Euraka等注册中都有心跳的检测,只不过实现的方式不同。这一节,我们做一个简单的了解。
简单来讲,无非就是保活和清理资源。
保活是为了防止防止长时间空闲连接被防火墙/NAT设备断开,这些机制都是处于安全机制的考虑。所以我们要弥补这些安全机制为我们带来的异常。
清理资源是出于内存等资源被无效占用的考虑,大量无效的连接可能会造成内存溢出。短期可能看不出来,长期积累下来服务可能莫名奇妙的就异常了,这种异常还很难察觉。通过心跳检测无效的连接及时清理,保证资源的有效利用。
心跳检测的实现可以用在服务端,也可以用在客户端,当然也可以混用。这个根据实际业务场景而定。
心跳检测的主要实现方式就是在客户端和服务端没有正常数据传输的时间段内,通过发送特定的消息来维持客户端和服务端的连接,而不影响正常的消息处理。
我们将通过三种方式来实现心跳的功能。
这种方式简单粗暴,无需关系业务信息的传输,就是定时发送心跳信息。如果放在服务端,如每隔10s向所有的客户端发送心跳信息;如果是放在客户端端,就是每隔客户端连接服务端之后,每个10s向服务端发送心跳信息。
服务端
服务端的代码:
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(5);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(2048, Unpooled.copiedBuffer("_".getBytes())));
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
// 业务处理
pipeline.addLast(new BusinessHandler());
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = serverBootstrap.bind(9091).sync();
log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
// 统一发送心跳
new Timer().schedule(new TimerTask() {
@Override
public void run() {
log.info("定时发送心跳...:" + BusinessHandler.channelGroup.size());
BusinessHandler.channelGroup.writeAndFlush("ping...来自服务端心跳_");
}
}, 1000, 10000);
// 对关闭通道进行
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("信息异常:", e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
BusinessHandler中用来管理所有的连接的客户端:
BusinessHandler处理接受的消息:
统一发送心跳:
new Timer().schedule(new TimerTask() {
@Override
public void run() {
log.info("定时发送心跳...:" + BusinessHandler.channelGroup.size());
BusinessHandler.channelGroup.writeAndFlush("ping...来自服务端心跳_");
}
}, 1000, 10000);
延迟1s,每隔10s发送一次心跳消息。
客户端
客户端比较简单,接收到心跳消息,响应一下就好了。
效果
问题
这种方式开发比较简单,但是在业务高峰期心跳会和业务数据抢夺资源,造成不必要的资源浪费。
对比上面的统一发送心跳,细粒度的就是针对每一个连接定时发送,因为每个客户端连接上来的时间不一致。发送的时机也不一样。
其他的基本一样。
这种情况针对消息有一定的错峰,接收消息也是一对一的操作。
IdleStateHandlerIdleStateHandler是框架自带的,用与空闲的读写操作。相比前两的粗暴的发送,这种方式可以避开业务高峰,在需要的时候发送心跳。
readerIdleTimeSeconds:空闲读writerIdleTimeSeconds:空闲写allIdleTimeSeconds:空闲读写我们可以根据客户端、服务端的性质来空闲读、空闲写还是空闲读写来发送心跳。
// 推荐配置(根据实际场景调整)
new IdleStateHandler(
60, // 读空闲:客户端60秒无数据
10, // 写空闲:服务端10秒无数据发送
0, // 所有空闲(通常不用)
TimeUnit.SECONDS
);
服务端配置
这里的场景服务端用于发送数据,所以我们可以坚控写空闲来维持连接的可用性。
为了演示的方便性,我们将写空闲设置为10s
写空闲处理
消息的处理
客户端可以不用变,我们直接看效果。
我们可以看到10s未写,则会触发写空闲,从而发送心跳。
上面只是简单的通过案例介绍了心跳的发送,在实际的应用中,我们需要定义消息的类型,通过类型判断是否为心跳消息,从而区分业务消息和心跳消息。还要考虑如果心跳没有被回应,需要考虑删除连接等操作。
这里仅仅提供一种思路,实际业务中考虑的东西会很多,需要我们在实际业务中随机应变!
上一篇: IndexScan比SeqScan返回的结果更少,索引损坏?
下一篇: 剑指offer-67、剪绳⼦