第一款Netty应用程序-Netty笔记(二)

/ 0评 / 0

编写服务器

所有的 Netty 服务器都需要以下两个部分:

我们编写一款最简单的应用程序,接收到客户端的消息,并把消息发送给客户端。因为我们的应用程序会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口,用于来定义响应入站(inbound)事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以直接继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。

服务器的 ChannelHandler 组件实现:

// 标示一个 ChannelHandler 可以被多个 Channel 安全地共享
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 对于每个传入的消息都要调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));

        // 将接收到的消息写给发送者,而不冲刷出站消息
        ctx.write(in);
    }

    /**
     * 通知 ChannelInboundHandler 最后一次对 channelRead() 的调用是当前批量读取中的最后一条消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 将未决消息冲刷到远程节点,并关闭该 Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在读取操作期间,有异常抛出时会调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 打印异常栈跟踪
        cause.printStackTrace();
        // 关闭该 Channel
        ctx.close();
    }
}

引导服务器的过程涉及:

引导服务器:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group)
                    // 指定所使用的 NIO 传输 Channel
                    .channel(NioServerSocketChannel.class)
                    // 使用指定的端口设置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    // 添加一个 EchoServerHandler 到子 Channel 的 ChannelPipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // EchoServerHandler 被标注为 @Sharable,所以我们可以总是使用同样的实例
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            // 异步地绑定服务器,调用 sync() 方法阻塞等待直到绑定完成
            ChannelFuture future = bootstrap.bind().sync();
            // 获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
            future.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup ,释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8888).start();
    }
}

编写客户端

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler ,在我们这个应用程序场景下,我们可以直接继承 SimpleChannelInboundHander 类即可处理所有需要的任务。

客户端的 ChannelHandler 组件实现:

// 标记该注解的实例可以被多个 Channel 共享
@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 连接建立
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当被通知 Channel 是活跃的时候,发送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    /**
     * 每当接收到数据
     * TCP保证了字节数组将会按照服务器发送它们的顺序被接收
     *
     * @param ctx
     * @param msg Netty 的字节容器
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        // 记录已接收消息的转储
        System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 在发送异常时,记录错误并关闭 Channel
        cause.printStackTrace();
        ctx.close();
    }

}

引导客户端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    // 适用于 NIO 传输的 Channel 类型
                    .channel(NioSocketChannel.class)
                    // 设置服务器的 InetSocketAddress
                    .remoteAddress(new InetSocketAddress(host, port))
                    // 在创建 Channel 时(连接被建立时),向 ChannelPipeline 中添加一个 EchoClientHandler 实例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 连接到远程节点,阻塞等待直到连接完成
            ChannelFuture future = bootstrap.connect().sync();
            // 阻塞,直到 Channel 关闭
            future.channel().closeFuture().sync();
        } finally {
            // 关闭线程池并且释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("127.0.0.1", 8888).start();
    }
}

测试

先启动服务器,然后启动客户端。结果服务器接收到客户端发送的信息,打印信息并将信息返回给客户端。客户端接收到服务器返回的信息,打印信息并退出。

服务器日志

客户端日志

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注