netty原理图及服务端启动过程

image.png

Channel 通道,可以类比nio的channel

NioEventLoop 单线程的线程池,可以处理多个channel,源于nio可多路复用的特点

ChannelInboundHandler 可以有多个,处理进入的事件,主要的业务逻辑写在这里

ChannelOutboundHandler 可以有多个,处理出去的事件,主要的业务逻辑写在这里

ChannelPipeline 管理所有的ChannelInboundHandler和ChannelOutboundHandler

ChannelHandlerContext 上下文,每个Handler有一个自己的Context

过程 每个Channel中绑定一个处理线程池NioEventLoop(或者其他线程池),由线程池分配一个线程负责处理Channel的I/O任务,但是一个线程可能处理多个Channel的I/O任务;Channel中绑定了ChannelPipeline,通过ChannelPipeline可以获取到多个ChannelInboundHandler和ChannelOutboundHandler,而每个ChannelInboundHandler和ChannelOutboundHandler都有自己的ChannelHandlerContext。


启动代码

 private void start() {
        //boss线程监听端口
        EventLoopGroup boss = new NioEventLoopGroup();
        //worker线程负责数据读写
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置线程池
            bootstrap.group(boss, worker);
            //设置socket工厂
            bootstrap.channel(NioServerSocketChannel.class);
            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //字符串解码器
                    pipeline.addLast(new StringDecoder());
                    //字符串编码器
                    pipeline.addLast(new StringEncoder());
                    //处理类
                    pipeline.addLast(new ServerHandler4());
                }
            });
            //绑定端口
            ChannelFuture future = bootstrap.bind(8866).sync();
            System.out.println("server start ...... ");
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch(InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }



分析代码


1. EventLoopGroup boss = new NioEventLoopGroup()

(1) EventLoopGroup可以认为是一个线程池或者线程组,此步创建父线程组,负责监听Channel事件,内部创建一个线程组数组,如果没有传入数组大小,数组大小默认为:

  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

Copy

线程组数组大小取系统变量io.netty.eventLoopThreads和netty可用进程*2的较大值

(2) 创建线程组数组代码如下:

        children = new EventExecutor[nThreads];
        for(int i = 0; i < nThreads; i++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch(Exception e) {
            } finally {
            }
        }


线程组数组为children = new EventExecutornThreads 线程组数组中的值:children[i] = newChild(executor, args)

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }


可见线程组数组中存的都是NioEventLoop,而NioEventLoop extends SingleThreadEventLoop,是只有一个活动线程的线程池


2. EventLoopGroup worker = new NioEventLoopGroup()

创建子线程组,负责处理Channel事件,实现同上。

其实NioEventLoopGroup中默认共有16个线程池,每个线程池是只有一个活动线程的NioEventLoop,NioEventLoop也是线程池,只是只有一个活动线程罢了。NioEventLoop会绑定到一个具体的Channel,负责整个Channel由生到死所有事件的处理,也就是一个Channel中的所有事件都由一个NioEventLoop的一个线程处理(NioEventLoop也只有一个线程)。NioEventLoop个数是固定的,但是客户端和服务端连接的Channel却有很多,所以一个NioEventLoop可以服务于很多个Channel。


3. ServerBootstrap bootstrap = new ServerBootstrap()

创建启动类


4. bootstrap.group(boss, worker)

设置父线程组和子线程组


5. bootstrap.channel(NioServerSocketChannel.class)

内部基于NioServerSocketChannel创建ChannelFactory,ChannelFactory可以生成NioServerSocketChannel


6. bootstrap.childHandler

添加handler处理具体的事件,netty内部具体什么时间将handler添加到ChannelPipeline中在后面的代码中


7. ChannelFuture future = bootstrap.bind(8866).sync()绑定端口

(1) 由port创建java.net.InetSocketAddress

(2) doBind绑定,其中 initAndRegister和doBind0最重要

private ChannelFuture doBind(final SocketAddress localAddress) {
       //重点分析
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
           //重点分析
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else{省略}
            return promise;
        }
    }


(3) initAndRegister()分为init和register两步

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

       //将用户bootstrap.childHandler的事件添加到ChannelPipeline中
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }


init主要有两个功能:初始化Channel配置、将用户bootstrap.childHandler的事件添加到ChannelPipeline中

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }


doRegister()方法可以由initAndRegister()一步步调试得到,javaChannel().register(eventLoop().unwrappedSelector(), 0, this)此处是将java.nio.channels.ServerSocketChannel注册到java.nio.channels.Selector上,已经调试到jdk的代码了

(4) doBind0()

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }


channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)调用下面

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }


如果jdk大于7,则通过java.nio.channels.ServerSocketChannel#bind(java.net.SocketAddress, int)方法绑定地址;否则通过java.net.ServerSocket#bind(java.net.SocketAddress, int)方法绑定地址。


8. future.channel().closeFuture().sync()

阻塞住当前线程直到服务器端监听端口关闭

评论区
Rick ©2018