引导应用程序-Netty笔记(八)

/ 0评 / 0

Bootstrap 类

引导类的层次结构中包括了一个抽象的父类和两个具体的引导子类:

ServerBootstrap 致力于使用一个父 Channel 来接受来自客户端的连接,并创建子 Channel 以用于它们之间的通信。Bootstrap 一般只需要一个单独的、没有父 Channel 的 Channel 来用于所有的网络交互。

引导客户端和无协议连接

Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中,Bootstrap 类的 API :

引导客户端

Bootstrap 类负责为客户端和使用无连接协议的应用程序创建 Channel :

引导一个使用了 NIO TCP 传输的客户端:

EventLoopGroup group = new NioEventLoopGroup();
// 创建一个 Bootstrap 类的实例以创建和连接新的客户端 Channel
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup,提供用于处理 Channel 事件的 EventLoop
bootstrap.group(group)
        // 指定要使用的 Channel 实现
        .channel(NioSocketChannel.class)
        // 设置用于 Channel 事件和数据的 ChannelInboundHandler
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                System.out.println("Received data");
            }
        });

// 连接到远程主机
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
// 添加监听器
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Connection established");
        } else {
            System.err.println("Connection attempt failed");
        }
    }
});

Channel 和 EventLoopGroup 的兼容性

相互兼容的 EventLoopGroup 和 Channel :

引导的时候 EventLoopGroup 和 Channel 要用同类前缀,比如上面我们实例里用到的 NioEventLoopGroupNioSocketChannel ,否则会导致 IllegalStateException 异常。

引导服务器

ServerBootstrap 类的 API :

ServerChannel 的实现负责创建子 Channel ,这些子 Channel 代表了已被接受的连接。ServerBootstrap 在 bind() 方法被调用时创建了一个 ServerChannel ,并且该 ServerChannel 管理多个子 Channel 。

引导服务器:

NioEventLoopGroup group = new NioEventLoopGroup();
// 创建 ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup ,其提供了用于处理 Channel 事件的 EventLoop
bootstrap.group(group)
        // 指定要使用的 Channel 实现
        .channel(NioServerSocketChannel.class)
        // 设置用于处理已被接受的子 Channel 的 I/O 及数据的 ChannelInboundHandler
        .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
                System.out.println("Received data");
            }
        });
// 通过配置好的 ServerBootstrap 的实例绑定该 Channel(NioServerSocketChannel)
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Server bound");
        } else {
            System.err.println("Bound attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

从 Channel 引导客户端

假设你的服务器正在处理一个客户端的请求,这个请求需要它充当第三方系统的客户端。当一个应用程序(如一个代理服务器)必须要和组织现有的系统(如 Web 服或数据库)集成时,就可能发生这种情况。这种情况下,将需要从已经被接受的子 Channel 中引导一个客户端 Channel 。

通过将已被接受的子 Channel 的 EventLoop 传递给 Bootstrap 的 group() 方法来共享该 EventLoop 。因为分配给 EventLoop 的所有 Channel 都使用同一个线程,所以这避免了额外的线程创建,以及相关的上下文切换。

从 Channel 引导客户端:

// 创建 ServerBootstrap 以创建 ServerSocketChannel ,并绑定它
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
        // 指定要使用的 Channel 实现
        .channel(NioServerSocketChannel.class)
        // 设置用于处理已被接受的子 Channel 的 I/O 和数据的 ChannelInboundHandler
        .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {

            ChannelFuture connectFuture;

            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                // 创建一个 Bootstrap 类的实例以连接到远程主机
                Bootstrap bootstrap = new Bootstrap();
                // 指定要使用的 Channel 实现
                bootstrap.channel(NioSocketChannel.class)
                        // 为入站 I/O 设置 ChannelInboundHandler
                        .handler(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
                                System.out.println("Received data");
                            }
                        });
                // 使用与分配给已被接受的子 Channel 相同的 EventLoop
                bootstrap.group(ctx.channel().eventLoop());
                // 连接到远程节点
                connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
            }

            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                if (connectFuture.isDone()) {
                    // 当连接完成时,执行一些数据操作(如代理)
                }
            }
        });

// 通过配置好的 ServerBootstrap 绑定该 ServerSocketChannel
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Server bound");
        } else {
            System.err.println("Bind attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

在引导过程中添加多个 ChannelHandler

引导服务器和使用 ChannelInitializer ,将多个 ChannelHandler 添加到一个 ChannelPipeline 中:

// 创建 ServerBootStrap 以创建和绑定新的 Channel
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 设置 EventLoopGroup ,其将提供用以处理 Channel 事件的 EventLoop
        bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                // 指定 Channel 的实现
                .channel(NioServerSocketChannel.class)
                // 注册一个 ChannelInitializerImpl 的实例来设置 ChannelPipeline
                .childHandler(new ChannelInitializerImpl());

        // 绑定到地址
        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
        future.sync();

自定义 ChannelInitializer 实现类:

// 将添加所需的 ChannelHandler 添加到 ChannelPipeline
    final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpClientCodec());
            pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
        }
    }

使用 Netty 的 ChannelOption 和属性

Bootstrap 可以通过 option() 方法来设置 ChannelOption ,引导创建的所有 Channel 都会被应用到 ChannelOption 中设置的值。包括底层连接的详细信息,如 keep-alive 或者超时属性以及缓冲区的设置。

在某些常用的属性和数据不可用时,Netty 提供了 AttributeMap (一个由 Channel 和引导类提供的集合)抽象以及 AttributeKey<T> (一个用于插入和获取属性值的泛型类)。可以安全地将任何类型的数据与客户端和服务器 Channel 相关联了。例如,跟踪用户和 Channel 之间的关系的服务器应用程序,可以通过将用户的 ID 存储为 Channel 的一个属性来完成。类似的技术可以被用来基于用户的 ID 将信息路由给用户,或者关闭活动较少的 Channel 。

使用属性值:

// 创建一个 AttributeKey以标识该属性
final AttributeKey<Integer> id = AttributeKey.newInstance("ID");
// 创建客户端 Channel 并连接它们
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup ,其提供了用以处理 Channel 事件的 EventLoop
bootstrap.group(new NioEventLoopGroup())
        // 指定 Channel 的实现
        .channel(NioSocketChannel.class)
        // 设置用以处理 Channel 的 I/O 以及数据的 ChannelInboundHandler
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                // 使用 AttributeKey 检索属性以及它的值
                Integer idValue = ctx.channel().attr(id).get();
                // do something with the idValue
            }

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
                System.out.println("Received data");
            }
        });

// 设置 ChannelOption ,其将在 connect() 或者 bind() 方法被调用时,设置到已经创建的 Channel 上
bootstrap.option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 存储该 id 属性
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();

引导 DatagramChannel

Netty 提供了各种 DatagramChannel 的实现,用于无连接的协议。和前面基于 TCP 协议的 SocketChannel 唯一的区别是引导类不用再调用 connect() 方法,而是调用 bind() 方法。

使用 Bootstrap 和 DatagramChannel :

Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup ,其提供了用以处理 Channel 事件的 EventLoop
bootstrap.group(new OioEventLoopGroup())
        // 指定 Channel 的实现
        .channel(OioDatagramChannel.class)
        // 设置用以处理 Channel 的 I/O 以及数据的 ChannelInboundHandler
        .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
            @Override
            public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                // Do something with the packet
            }
        });

// 调用 bind() 方法,因为该协议是无连接的
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Channel bound");
        } else {
            System.err.println("Bind attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

关闭

引导让应用程序启动并运行起来,但是关闭的时候也需要优雅的关闭。需要关闭 EventLoopGroup ,它将处理任何挂起的事件和任务,并且随后释放所有活动的线程。这就是调用 EventLoopGroup.shutdownGracefully() 方法的作用。这个方法是一个异步的操作,需要阻塞等待直到它完成,或者给返回的 Future 注册一个 监听器。

优雅关闭:

EventLoopGroup group = new NioEventLoopGroup(); 
Bootstrap bootstrap = new Bootstrap(); 
bootstrap.group(group)
... .channel(NioSocketChannel.class);
// shutdownGracefully() 方法将释放所有的资源,并且关闭所有当前正在使用的 Channel
Future<?> future = group.shutdownGracefully(); 

或者,也可以在调用 EventLoopGroup.shutdownGracefully() 方法之前,显式地在所有活动的 Channel 上调用 Channel.close() 方法。但是在任何情况下,都记得关闭 EventLoopGroup

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注