Channel 通道,可以类比nio的channel
NioEventLoop 单线程的线程池,可以处理多个channel,源于nio可多路复用的特点
ChannelInboundHandler 可以有多个,处理进入的事件,主要的业务逻辑写在这里
ChannelOutboundHandler 可以有多个,处理出去的事件,主要的业务逻辑写在这里
ChannelPipeline 管理所有的ChannelInboundHandler和ChannelOutboundHandler
ChannelHandlerContext 上下文,每个Handler有一个自己的Context
过程 每个Channel中绑定一个处理线程池NioEventLoop(或者其他线程池),由线程池分配一个线程负责处理Channel的I/O任务,但是一个线程可能处理多个Channel的I/O任务;Channel中绑定了ChannelPipeline,通过ChannelPipeline可以获取到多个ChannelInboundHandler和ChannelOutboundHandler,而每个ChannelInboundHandler和ChannelOutboundHandler都有自己的ChannelHandlerContext。
private void start() {
//boss线程监听端口
EventLoopGroup boss = new NioEventLoopGroup();
//worker线程负责数据读写
EventLoopGroup worker = new NioEventLoopGroup();
try {
//辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//设置线程池
bootstrap.group(boss, worker);
//设置socket工厂
bootstrap.channel(NioServerSocketChannel.class);
//设置管道工厂
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//获取管道
ChannelPipeline pipeline = socketChannel.pipeline();
//字符串解码器
pipeline.addLast(new StringDecoder());
//字符串编码器
pipeline.addLast(new StringEncoder());
//处理类
pipeline.addLast(new ServerHandler4());
}
});
//绑定端口
ChannelFuture future = bootstrap.bind(8866).sync();
System.out.println("server start ...... ");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
//优雅退出,释放线程池资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
(1) EventLoopGroup可以认为是一个线程池或者线程组,此步创建父线程组,负责监听Channel事件,内部创建一个线程组数组,如果没有传入数组大小,数组大小默认为:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
Copy
线程组数组大小取系统变量io.netty.eventLoopThreads和netty可用进程*2的较大值
(2) 创建线程组数组代码如下:
children = new EventExecutor[nThreads];
for(int i = 0; i < nThreads; i++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch(Exception e) {
} finally {
}
}
线程组数组为children = new EventExecutornThreads 线程组数组中的值:children[i] = newChild(executor, args)
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可见线程组数组中存的都是NioEventLoop,而NioEventLoop extends SingleThreadEventLoop,是只有一个活动线程的线程池
创建子线程组,负责处理Channel事件,实现同上。
其实NioEventLoopGroup中默认共有16个线程池,每个线程池是只有一个活动线程的NioEventLoop,NioEventLoop也是线程池,只是只有一个活动线程罢了。NioEventLoop会绑定到一个具体的Channel,负责整个Channel由生到死所有事件的处理,也就是一个Channel中的所有事件都由一个NioEventLoop的一个线程处理(NioEventLoop也只有一个线程)。NioEventLoop个数是固定的,但是客户端和服务端连接的Channel却有很多,所以一个NioEventLoop可以服务于很多个Channel。
创建启动类
设置父线程组和子线程组
内部基于NioServerSocketChannel创建ChannelFactory,ChannelFactory可以生成NioServerSocketChannel
添加handler处理具体的事件,netty内部具体什么时间将handler添加到ChannelPipeline中在后面的代码中
(1) 由port创建java.net.InetSocketAddress
(2) doBind绑定,其中 initAndRegister和doBind0最重要
private ChannelFuture doBind(final SocketAddress localAddress) {
//重点分析
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
//重点分析
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else{省略}
return promise;
}
}
(3) initAndRegister()分为init和register两步
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
//将用户bootstrap.childHandler的事件添加到ChannelPipeline中
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init主要有两个功能:初始化Channel配置、将用户bootstrap.childHandler的事件添加到ChannelPipeline中
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
doRegister()方法可以由initAndRegister()一步步调试得到,javaChannel().register(eventLoop().unwrappedSelector(), 0, this)此处是将java.nio.channels.ServerSocketChannel注册到java.nio.channels.Selector上,已经调试到jdk的代码了
(4) doBind0()
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)调用下面
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
如果jdk大于7,则通过java.nio.channels.ServerSocketChannel#bind(java.net.SocketAddress, int)方法绑定地址;否则通过java.net.ServerSocket#bind(java.net.SocketAddress, int)方法绑定地址。
阻塞住当前线程直到服务器端监听端口关闭