springboot 整合netty编写时间服务器
这个例子与上个例子(springboot 整合netty做心跳检测)最大的不同就是,服务端发送包含32位整数的消息,而不接收任何请求,并在发送消息后关闭连接。
因为我们将忽略任何接收到的数据,一旦建立连接就发送消息,这次我们不能使用channelRead()方法。 相反,我们应该重写channelActive()方法。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.21.Final</version>
</dependency>
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
/**
* 这次我们不能使用channelRead()方法。相反,我们应该重写channelActive()方法
*/
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
/**
* 要发送新消息,我们需要分配一个包含消息的新缓冲区。我们要写一个32位整数,
* 因此我们需要一个容量至少为4个字节的ByteBuf。
* 通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分配一个新的缓冲区。
*/
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture cf = ctx.writeAndFlush(time);
/**
* 我们怎么知道写请求是否完成?这就像向返回的ChannelFuture添加ChannelFutureListener一样。
* 在这里,我们创建了一个新的匿名ChannelFutureListener,它在操作完成时关闭Channel
*/
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert cf == future;
ctx.close();
}
});*/
//为了简化代码也可以这么写
cf.addListener(ChannelFutureListener.CLOSE);
}
/**
* 当由于I / O错误或由于处理事件时抛出异常导致的处理程序实现而由Netty引发异常时,使用Throwable调用exceptionCaught()事件处理程序方法。
* 在大多数情况下,应记录捕获的异常并在此处关闭其关联的通道,尽管此方法的实现可能会有所不同,具体取决于您要处理特殊情况的操作。
* 例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
当然这也需要一个服务启动引导类 和上个例子(springboot 整合netty做心跳检测)基本是一样,更多详细说明已经在上个例子(springboot 整合netty做心跳检测)中进行了说明,这个就不在阐述
@Component
public class DiscardServer {
@Value("${netty.server.port}")
private int port;
@PostConstruct
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定并开始接受传入连接。
ChannelFuture f = b.bind(port).sync();
// 等到服务器套接字关闭。
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
@Component
public class TimeClient {
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.host}")
private String host;
@PostConstruct
public void timeClient() throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//启动客户端
ChannelFuture f = bootstrap.connect(host, port).sync();
//等到连接关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
同样这里不做代码解释代码解释说明都放在了上个例子(springboot 整合netty做心跳检测)
下面代码接受服务端的消息并打印
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//Netty将从对等方发送的数据读入ByteBuf
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println("收到服务端发送的消息"+new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}