Bootstrap 类
引导类的层次结构中包括了一个抽象的父类和两个具体的引导子类:
ServerBootstrap
致力于使用一个父 Channel 来接受来自客户端的连接,并创建子 Channel 以用于它们之间的通信。Bootstrap
一般只需要一个单独的、没有父 Channel 的 Channel 来用于所有的网络交互。
引导客户端和无协议连接
Bootstrap 类被用于客户端或者使用了无连接协议的应用程序中,Bootstrap 类的 API :
Bootstrap group(EventLoopGroup)
:设置用于处理 Channel 所有事件的 EventLoopGroupBootstrap channel(Class)
/Bootstrap channelFactory( ChannelFactory)
:channel() 方法指定了 Channel 的实现类。如果该实现类没提供默认的构造函数,可以通过调用 channelFactory() 方法来指定一个工厂类,它将会被 bind() 方法调用Bootstrap localAddress(SocketAddress)
:指定 Channel 应该绑定到的本地地址。如果没有指定, 则将由操作系统创建一个随机的地址。或者,也可以通过 bind() 或 connect() 方法指定 localAddressBootstrap option( ChannelOption option, T value)
:设置 ChannelOption,其将被应用到每个新创建的 Channel 的 ChannelConfig。这些选项将会通过 bind() 或者 connect() 方法设置到 Channel,不管哪个先被调用。这个方法在 Channel 已经被创建后再调用 将不会有任何的效果。支持的 ChannelOption 取决于使用的 Channel 类型Bootstrap attr(Attribute key, T value)
:指定新创建的 Channel 的属性值。这些属性值是通过 bind() 或者 connect() 方法设置到 Channel 的,具体取决于谁最先被调用。这个方法在 Channel 被创建后将不会有任何的效果Bootstrap handler(ChannelHandler)
:设置将被添加到 ChannelPipeline 以接收事件通知的 ChannelHandlerBootstrap clone()
:创建一个当前 Bootstrap 的克隆,其具有和原始的 Bootstrap 相同的设置信息Bootstrap remoteAddress(SocketAddress)
:设置远程地址,也可以通过 connect() 方法来指定它ChannelFuture connect()
:连接到远程节点并返回一个 ChannelFuture,其将会在连接操作完成后接收到通知ChannelFuture bind()
:绑定Channel 并返回一个 ChannelFuture ,其将会在绑定操作完成后接收到通知,在那之后必须调用 Channel.connect()方法来建立连接
引导客户端
Bootstrap 类负责为客户端和使用无连接协议的应用程序创建 Channel :
引导一个使用了 NIO TCP 传输的客户端:
EventLoopGroup group = new NioEventLoopGroup();
// 创建一个 Bootstrap 类的实例以创建和连接新的客户端 Channel
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup,提供用于处理 Channel 事件的 EventLoop
bootstrap.group(group)
// 指定要使用的 Channel 实现
.channel(NioSocketChannel.class)
// 设置用于 Channel 事件和数据的 ChannelInboundHandler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Received data");
}
});
// 连接到远程主机
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
// 添加监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
}
}
});
Channel 和 EventLoopGroup 的兼容性
相互兼容的 EventLoopGroup 和 Channel :
引导的时候 EventLoopGroup 和 Channel 要用同类前缀,比如上面我们实例里用到的 NioEventLoopGroup
和 NioSocketChannel
,否则会导致 IllegalStateException 异常。
引导服务器
ServerBootstrap 类的 API :
group
:设置 ServerBootstrap 要用的 EventLoopGroup。这个 EventLoopGroup 将用于 ServerChannel 和被接受的子 Channel 的 I/O 处理channel
:设置将要被实例化的 ServerChannel 类channelFactory
:如果不能通过默认的构造函数创建 Channel ,那么可以提供一个 ChannelFactorylocalAddress
:指定 ServerChannel 应该绑定到的本地地址。如果没有指定,则将由操作系统使用一个随机地址。或者,可以通过 bind() 方法来指定该 localAddressoption
:指定要应用到新创建的 ServerChannel 的 ChannelConfig 的 ChannelOption。这些选项将会通过 bind() 方法设置到 Channel。在 bind() 方法被调用之后,设置或者改变 ChannelOption 都不会有任何的效果。所支持的 ChannelOption 取决于所使用的 Channel 类型childOption
:指定当子 Channel 被接受时,应用到子 Channel 的 ChannelConfig 的 ChannelOption。所支持的 ChannelOption 取决于所使用的 Channel 的类型attr
:指定 ServerChannel 上的属性,属性将会通过 bind() 方法设置给 Channel。 在调用 bind() 方法之后改变它们将不会有任何的效果childAttr
:将属性设置给已经被接受的子 Channel。接下来的调用将不会有任何的效果handler
:设置被添加到 ServerChannel 的 ChannelPipeline 中的 ChannelHandler。childHandler
:设置将被添加到已被接受的子 Channel 的 ChannelPipeline 中的 ChannelHandler。handler() 方法和 childHandler() 方法之间的区别是:前者所添加的 ChannelHandler 由接受子 Channel 的 ServerChannel 处理,而 childHandler() 方法所添加的 ChannelHandler 将由已被接受的子 Channel 处理,其代表一个绑定到远程节点的套接字clone
:克隆一个设置和原始的 ServerBootstrap 相同的 ServerBootstrapbind
:绑定 ServerChannel 并且返回一个 ChannelFuture,其将会在绑定操作完成后收到通知(带着成功或者失败的结果)
ServerChannel 的实现负责创建子 Channel ,这些子 Channel 代表了已被接受的连接。ServerBootstrap 在 bind()
方法被调用时创建了一个 ServerChannel ,并且该 ServerChannel 管理多个子 Channel 。
引导服务器:
NioEventLoopGroup group = new NioEventLoopGroup();
// 创建 ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup ,其提供了用于处理 Channel 事件的 EventLoop
bootstrap.group(group)
// 指定要使用的 Channel 实现
.channel(NioServerSocketChannel.class)
// 设置用于处理已被接受的子 Channel 的 I/O 及数据的 ChannelInboundHandler
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
});
// 通过配置好的 ServerBootstrap 的实例绑定该 Channel(NioServerSocketChannel)
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bound attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
从 Channel 引导客户端
假设你的服务器正在处理一个客户端的请求,这个请求需要它充当第三方系统的客户端。当一个应用程序(如一个代理服务器)必须要和组织现有的系统(如 Web 服或数据库)集成时,就可能发生这种情况。这种情况下,将需要从已经被接受的子 Channel 中引导一个客户端 Channel 。
通过将已被接受的子 Channel 的 EventLoop 传递给 Bootstrap 的 group()
方法来共享该 EventLoop 。因为分配给 EventLoop 的所有 Channel 都使用同一个线程,所以这避免了额外的线程创建,以及相关的上下文切换。
从 Channel 引导客户端:
// 创建 ServerBootstrap 以创建 ServerSocketChannel ,并绑定它
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
// 指定要使用的 Channel 实现
.channel(NioServerSocketChannel.class)
// 设置用于处理已被接受的子 Channel 的 I/O 和数据的 ChannelInboundHandler
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
ChannelFuture connectFuture;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 创建一个 Bootstrap 类的实例以连接到远程主机
Bootstrap bootstrap = new Bootstrap();
// 指定要使用的 Channel 实现
bootstrap.channel(NioSocketChannel.class)
// 为入站 I/O 设置 ChannelInboundHandler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("Received data");
}
});
// 使用与分配给已被接受的子 Channel 相同的 EventLoop
bootstrap.group(ctx.channel().eventLoop());
// 连接到远程节点
connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
if (connectFuture.isDone()) {
// 当连接完成时,执行一些数据操作(如代理)
}
}
});
// 通过配置好的 ServerBootstrap 绑定该 ServerSocketChannel
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
在引导过程中添加多个 ChannelHandler
引导服务器和使用 ChannelInitializer ,将多个 ChannelHandler 添加到一个 ChannelPipeline 中:
// 创建 ServerBootStrap 以创建和绑定新的 Channel
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup ,其将提供用以处理 Channel 事件的 EventLoop
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
// 指定 Channel 的实现
.channel(NioServerSocketChannel.class)
// 注册一个 ChannelInitializerImpl 的实例来设置 ChannelPipeline
.childHandler(new ChannelInitializerImpl());
// 绑定到地址
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
自定义 ChannelInitializer 实现类:
// 将添加所需的 ChannelHandler 添加到 ChannelPipeline
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
使用 Netty 的 ChannelOption 和属性
Bootstrap 可以通过 option()
方法来设置 ChannelOption
,引导创建的所有 Channel 都会被应用到 ChannelOption 中设置的值。包括底层连接的详细信息,如 keep-alive
或者超时属性以及缓冲区的设置。
在某些常用的属性和数据不可用时,Netty 提供了 AttributeMap
(一个由 Channel 和引导类提供的集合)抽象以及 AttributeKey<T>
(一个用于插入和获取属性值的泛型类)。可以安全地将任何类型的数据与客户端和服务器 Channel 相关联了。例如,跟踪用户和 Channel 之间的关系的服务器应用程序,可以通过将用户的 ID 存储为 Channel 的一个属性来完成。类似的技术可以被用来基于用户的 ID 将信息路由给用户,或者关闭活动较少的 Channel 。
使用属性值:
// 创建一个 AttributeKey以标识该属性
final AttributeKey<Integer> id = AttributeKey.newInstance("ID");
// 创建客户端 Channel 并连接它们
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup ,其提供了用以处理 Channel 事件的 EventLoop
bootstrap.group(new NioEventLoopGroup())
// 指定 Channel 的实现
.channel(NioSocketChannel.class)
// 设置用以处理 Channel 的 I/O 以及数据的 ChannelInboundHandler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// 使用 AttributeKey 检索属性以及它的值
Integer idValue = ctx.channel().attr(id).get();
// do something with the idValue
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
});
// 设置 ChannelOption ,其将在 connect() 或者 bind() 方法被调用时,设置到已经创建的 Channel 上
bootstrap.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 存储该 id 属性
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();
引导 DatagramChannel
Netty 提供了各种 DatagramChannel
的实现,用于无连接的协议。和前面基于 TCP 协议的 SocketChannel 唯一的区别是引导类不用再调用 connect()
方法,而是调用 bind()
方法。
使用 Bootstrap 和 DatagramChannel :
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup ,其提供了用以处理 Channel 事件的 EventLoop
bootstrap.group(new OioEventLoopGroup())
// 指定 Channel 的实现
.channel(OioDatagramChannel.class)
// 设置用以处理 Channel 的 I/O 以及数据的 ChannelInboundHandler
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// Do something with the packet
}
});
// 调用 bind() 方法,因为该协议是无连接的
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Channel bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
关闭
引导让应用程序启动并运行起来,但是关闭的时候也需要优雅的关闭。需要关闭 EventLoopGroup ,它将处理任何挂起的事件和任务,并且随后释放所有活动的线程。这就是调用 EventLoopGroup.shutdownGracefully()
方法的作用。这个方法是一个异步的操作,需要阻塞等待直到它完成,或者给返回的 Future 注册一个 监听器。
优雅关闭:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
... .channel(NioSocketChannel.class);
// shutdownGracefully() 方法将释放所有的资源,并且关闭所有当前正在使用的 Channel
Future<?> future = group.shutdownGracefully();
或者,也可以在调用 EventLoopGroup.shutdownGracefully()
方法之前,显式地在所有活动的 Channel 上调用 Channel.close()
方法。但是在任何情况下,都记得关闭 EventLoopGroup 。