编写服务器
所有的 Netty 服务器都需要以下两个部分:
- 至少一个 ChannelHandler ,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
- 引导,这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听的连接请求的端口上。
我们编写一款最简单的应用程序,接收到客户端的消息,并把消息发送给客户端。因为我们的应用程序会响应传入的消息,所以它需要实现 ChannelInboundHandler
接口,用于来定义响应入站(inbound)事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以直接继承 ChannelInboundHandlerAdapter
类也就足够了,它提供了 ChannelInboundHandler
的默认实现。
服务器的 ChannelHandler 组件实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
// 标示一个 ChannelHandler 可以被多个 Channel 安全地共享 @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 对于每个传入的消息都要调用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); // 将接收到的消息写给发送者,而不冲刷出站消息 ctx.write(in); } /** * 通知 ChannelInboundHandler 最后一次对 channelRead() 的调用是当前批量读取中的最后一条消息 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 将未决消息冲刷到远程节点,并关闭该 Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /** * 在读取操作期间,有异常抛出时会调用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印异常栈跟踪 cause.printStackTrace(); // 关闭该 Channel ctx.close(); } } |
引导服务器的过程涉及:
- 绑定到服务器将在其上监听并接受传入的连接请求的端口。
- 配置 Channel ,以及将有关的入站消息通知给 ChannelHandler 实例。
引导服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) // 指定所使用的 NIO 传输 Channel .channel(NioServerSocketChannel.class) // 使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) // 添加一个 EchoServerHandler 到子 Channel 的 ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // EchoServerHandler 被标注为 @Sharable,所以我们可以总是使用同样的实例 ch.pipeline().addLast(serverHandler); } }); // 异步地绑定服务器,调用 sync() 方法阻塞等待直到绑定完成 ChannelFuture future = bootstrap.bind().sync(); // 获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成 future.channel().closeFuture().sync(); } finally { // 关闭 EventLoopGroup ,释放所有的资源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoServer(8888).start(); } } |
编写客户端
如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler
,在我们这个应用程序场景下,我们可以直接继承 SimpleChannelInboundHander
类即可处理所有需要的任务。
客户端的 ChannelHandler 组件实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
// 标记该注解的实例可以被多个 Channel 共享 @Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 连接建立 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 当被通知 Channel 是活跃的时候,发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } /** * 每当接收到数据 * TCP保证了字节数组将会按照服务器发送它们的顺序被接收 * * @param ctx * @param msg Netty 的字节容器 * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // 记录已接收消息的转储 System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 在发送异常时,记录错误并关闭 Channel cause.printStackTrace(); ctx.close(); } } |
引导客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) // 适用于 NIO 传输的 Channel 类型 .channel(NioSocketChannel.class) // 设置服务器的 InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) // 在创建 Channel 时(连接被建立时),向 ChannelPipeline 中添加一个 EchoClientHandler 实例 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); // 连接到远程节点,阻塞等待直到连接完成 ChannelFuture future = bootstrap.connect().sync(); // 阻塞,直到 Channel 关闭 future.channel().closeFuture().sync(); } finally { // 关闭线程池并且释放所有的资源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("127.0.0.1", 8888).start(); } } |
测试
先启动服务器,然后启动客户端。结果服务器接收到客户端发送的信息,打印信息并将信息返回给客户端。客户端接收到服务器返回的信息,打印信息并退出。

