netty模型分析与代码示例

前言

    买了本《Netty权威指南》来看,目前粗略看了一些,感觉写得很好,为了能记录一些学习成果,便有了这篇文章,如果有地方写错了,也请各位指正。(文中演示代码并没考虑太多真实情况,比如 粘包/拆包 问题 )


概念介绍


常见I/O模型

1)阻塞I/O(blocking I/O)

2)非阻塞I/O (nonblocking I/O)

3) I/O复用(select,poll,epoll) (I/O multiplexing)

4)信号驱动I/O (signal driven I/O (SIGIO))

5)异步I/O (asynchronous I/O (the POSIX aio_functions))


关键流程

对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个I/O的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 记住这两点很重要,因为这些I/O Model的区别就是在这两个阶段上各有不同的情况。


I/O模型解释

阻塞I/O(blocking I/O)

在Linux中,默认情况下所有的socket都是blocking

blocking

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达,这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在I/O执行的两个阶段都被block了。

我们在使用这种方式进行网络编程时,因为会阻塞线程,为了能让服务器接收更多的请求,我们不得不使用多线程来处理新的请求I/O,由于线程的创建会产生开销,于是有人想到了用线程池来解决,这虽然提高了线程的复用率,但是本质上依旧没有解决因为线程阻塞带来的性能瓶颈,当大量的连接到来时,系统可能会因为没有足够的资源来处理请求,而拒绝连接。

上代码:

public class MyServer {
    public static class MyRun implements Runnable{
       private Socket socket = null;
        public MyRun(Socket socket) {
            this.socket = socket;
        }
        public void run() {
            BufferedReader bufferedReader = null;
            PrintWriter printWriter = null;
            try {
                bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line = bufferedReader.readLine();
                printWriter = new PrintWriter(socket.getOutputStream(), true);
                if (line.length() < 1){
                    printWriter.println("it is null");
                    return;
                }
                System.out.println(line);
                if ("get time".equals(line)){
                    printWriter.println(new Date().toString());
                }else{
                    printWriter.println("isn't ok");
                }
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    if (bufferedReader != null) {
                        bufferedReader.close();
                    }
                    if (printWriter != null){
                        printWriter.close();
                    }
                    if (socket != null){
                        socket.close();
                    }
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) {
        int port = 8080;
        ServerSocket serverSocket = null;
        Executor executor = Executors.newFixedThreadPool(100);
        try {
             serverSocket = new ServerSocket(port);
            while (true){
                Socket accept = serverSocket.accept();
                executor.execute(new MyRun(accept));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (serverSocket != null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Copy

非阻塞I/O (nonblocking I/O)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时:

screenshot

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。

用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这个拷贝的过程会阻塞进程),然后返回。

所以,用户进程其实是需要不断的主动询问kernel数据好了没有。因为需要不断地轮询,这消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他I/O模型中使用非阻塞IO这一特性。

I/O复用(select,poll,epoll) (I/O multiplexing)

screenshot

I/O复用模型会用到select、poll、epoll函数,这几个函数也会使进程阻塞,但是和阻塞I/O所不同的是,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。

补充说明:

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在I/O multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket I/O给block。

I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降底了系统的维护工作量,节省了系统资源。

目前支持I/O多路复用的系统调用有 select,pselect,poll,epoll,在Linux网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epoll。epoll与select的原理比较类似,为了克服select的缺点,epoll作了很多重大改进。

select:

select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:

1. 单个进程可监视的fd数量被限制,即能监听端口的大小有限。

一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.

  1. 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低:
  2. 当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。
  3. 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大

poll:

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

  1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。 2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

epoll:

epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就需态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。

Java从1.4开始提供了NIO工具包,支持用 I/O复用模型来进行网络编程。其模式图:

screenshot

上代码:

public class NioServer {

    public static void main(String[] args) {
        new Thread(new MyRun()).start();
    }

    public static class MyRun implements Runnable {
        private Selector selector;
        private ServerSocketChannel serverSocketChannel;
        private volatile boolean stop = false;

        public MyRun() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(8080), 1024);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (Iterator<SelectionKey> iterator = selectionKeys.iterator(); iterator.hasNext(); ) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        try {
                            handlerKey(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("server is stop");
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (serverSocketChannel != null) {
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private void handlerKey(SelectionKey key) throws IOException {
            if (key.isValid()) {
                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                }
                if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = channel.read(byteBuffer);
                    if (read > 0) {
                        byteBuffer.flip();
                        byte[] bytes = new byte[byteBuffer.remaining()];
                        byteBuffer.get(bytes);
                        String recive_msg = new String(bytes, "utf-8");
                        System.out.println(recive_msg);
                        String rep_msg = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
                        dowrite(channel, rep_msg);
                    } else if (read < 0) {
                        key.cancel();
                        channel.close();
                    } else {

                    }
                }
            }
        }

        private void dowrite(SocketChannel channel, String rep) throws IOException {
            if (rep != null && rep.length() > 0) {
                byte[] bytes = rep.getBytes();
                ByteBuffer wrte = ByteBuffer.allocate(bytes.length);
                wrte.put(bytes);
                wrte.flip();
                channel.write(wrte);
            }
        }
    }
}

Copy

信号驱动I/O (signal driven I/O (SIGIO))

screenshot

首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据。

异步I/O (asynchronous I/O (the POSIX aio_functions))

screenshot

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作,而异步I/O模型由内核通知我们I/O操作何时已经完成

JDK1.7 升级了NIO 类库,升级后的NIO类库被称为NIO2.0。java也正是提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO。

NIO2.0的异步套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。它不需要通过多路复用器(Selector) 对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

上代码

public class AIOServer {

    private AsynchronousServerSocketChannel serverSocketChannel;
    private CountDownLatch countDownLatch = new CountDownLatch(1);


    public AIOServer() {
        try {
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080),1024);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void go() throws IOException {
        try {
            doAccept();
            countDownLatch.await();
        }catch (Exception e){
            serverSocketChannel.close();
        }
    }

    private void doAccept() {
        serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
            public void completed(final AsynchronousSocketChannel  socketChannel, AIOServer aioServer) {
                //必须加这句才能接收其他的客户端连接,形成一个循环
                aioServer.serverSocketChannel.accept(aioServer,this);
                final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                    public void completed(Integer a, ByteBuffer bf) {
                        byteBuffer.flip();
                        byte[] bytes = new byte[bf.remaining()];
                        bf.get(bytes);
                        try {
                            String msg = new String(bytes, "utf-8");
                            System.out.println(msg);
                            String rep = "get time".equals(msg) ? new Date().toString() : "bad time";
                            doWrite(rep, socketChannel);
                        } catch (Exception e) {
                            try {
                                socketChannel.close();
                            } catch (IOException ex) {
                                e.printStackTrace();
                            }
                        }
                    }

                    private void doWrite(String rep, final AsynchronousSocketChannel result) throws IOException {
                        byte[] bytes = rep.getBytes("utf-8");
                        ByteBuffer bf = ByteBuffer.allocate(bytes.length);
                        bf.put(bytes);
                        bf.flip();
                        result.write(bf, bf, new CompletionHandler<Integer, ByteBuffer>() {
                            public void completed(Integer a, ByteBuffer bfer) {
                                if (bfer.hasRemaining()) {
                                    result.write(bfer, bfer, this);
                                }
                            }

                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    result.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });

                    }

                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            socketChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            public void failed(Throwable exc, AIOServer attachment) {
                try {
                    serverSocketChannel.close();
                    countDownLatch.countDown();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws IOException {
            new AIOServer().go();
    }
}

Copy

5个I/O模型的比较:

screenshot


区别

blocking和non-blocking的区别在哪?

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

所以阻塞IO和非阻塞IO的区别就在于:应用程序的调用是否立即返回!

synchronous IO和asynchronous IO的区别在哪?

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:

A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;

An asynchronous I/O operation does not cause the requesting process to be blocked;

所以同步IO和异步IO的区别就在于:数据访问的时候进程是否阻塞!


初见Netty

从之前的代码来看JDK NIO API复杂,同时JDK NIO还存在一些BUG,这对于网络编程人员简直是噩梦,万幸的是我们有Netty,它良好的设计帮我们简化了开发,让我们能专注于业务开发。对于之前代码,我们用Netty 可以这样写:

public class NettyServer {

    private static EventLoopGroup bosses = new NioEventLoopGroup();
    private static EventLoopGroup workers = new NioEventLoopGroup();

    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bosses, workers).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf bf = (ByteBuf) msg;
                        byte[] bs = new byte[bf.readableBytes()];
                        bf.readBytes(bs);

                        String recive_msg = new String(bs, "utf-8");
                        System.out.println(recive_msg);
                        String rep = "get time".equals(recive_msg) ? new Date().toString() : "bad time";
                        ByteBuf byteBuf = Unpooled.copiedBuffer(rep.getBytes("utf-8"));
                        ctx.writeAndFlush(byteBuf);
                    }

                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        ctx.close();
                    }
                });
            }
        });

        try {
            ChannelFuture sync = bootstrap.bind(8080).sync();
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            bosses.shutdownGracefully();
            workers.shutdownGracefully();
        }
    }
}

Copy

可以看到代码简化了很多,在后面的文章中,我会继续说明netty的用法。


疑问

Netty 是对AIO 封装的还是对NIO封装的?我找到了如下答案:

Netty4的beta3加了AIO了,但是到beta9又被去了,作者的意思是测试下来AIO性能不如NIO,所以没必要用,在Linux上NIO的实现本身就是epoll,使用jdk的AIO没有意义,在windows上jdk的AIO实现是IOCP,这种情况下使用AIO是比poll的性能高的,但是netty的服务器一般是在linux上,所以抛弃windows没啥大不了,windows最多做个客户端,用nio也就够了。

问题网址在这:https://github.com/netty/netty/issues/2515 感兴趣的小伙伴可以去看看



评论区
Rick ©2018