Netty 水位线之 ChannelOutboundBuffer 源码分析

每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。

ChannelOutboundBuffer 类属性

public final class ChannelOutboundBuffer {

    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);


    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // 缓存链表中被刷新的第一个元素
    private Entry flushedEntry;
    // 缓存链表中中第一个未刷新的元素
    private Entry unflushedEntry;
    // 缓存链表中的尾元素
    private Entry tailEntry;
    // 刷新但还没有写入到 socket 中的数量
    private int flushed;

从类的属性中可以看出 ChannelOutboundBuffer 定义了一个链表进行存储写入的数据。

1、ChannelOutboundBuffer 添加了N个 Entry


2、当 flush 后的效果


3、flush 后把所有的数据都写入 Socket 中


4、当再次写入 Entry 后


下面针对操作这些操作进行分析,添加、刷新、删除等操作。

addMessage() 添加操作

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

1、创建一个 新的Entry。

2、判断 tailEntry 是否为 null,如果为 null 说明链表为空。则把 flushedEntry 置为null。

3、如果 tailEntry 不为空,则把新添加的 Entry 添加到 tailEntry 后面 。

4、 将新添加的 Entry 设置为 链表的 tailEntry。

5、如果 unflushedEntry 为null,说明没有未被刷新的元素。新添加的Entry 肯定是未被刷新的,则把当前 Entry 设置为 unflushedEntry 。

6、统计未被刷新的元素的总大小。

addFlush() 刷新操作

当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将 unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,至于为什么这么做?

答:因为 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。

public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

1、通过 unflushedEntry 获取未被刷新元素 entry。

2、如果 entry 为null 说明没有待刷新的元素,不执行任何操作

3、如果 entry 不为 null,说明有需要被刷新的元素

4、如果 flushedEntry == null 说明当前没有正在刷新的任务,则把 entry 设置为 flushedEntry 刷新的起点。

5、循环设置 entry, 设置这些 entry 状态设置为非取消状态,如果设置失败,则把这些entry 节点取消并使 totalPendingSize 减去这个节点的字节大小。

在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。代码参见 AbstractUnsafe.flush() 方法。

remove() 删除操作

public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }

    // recycle the entry
    e.recycle();

    return true;
}

1、获取 flushedEntry 节点,链表的头结点。如果获取不到清空 ByteBuf 缓存。

2、在链表上移除该 Entry。如果之前没有取消,只释放消息、通知和递减。

3、回收 Entry 对象。

removeEntry() 链表移除节点

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

1、如果 flushed ==0 说明,链表中所有 flush 的数据都已经发送到 Socket 中。把 flushedEntry 置位 null。此时链表可能还有 unflushedEntry 数据。

如果此时 e == tailEntry 说明链表为空,则把 tailEntry 和 unflushedEntry 都置为空。

flush==0,可能是这种状态,如下图:


2、把 flushedEntry 置为下一个节点(flushedEntry 此时是头结点)。

Entry 对象池使用

由于 Entry 使用比较频繁,会频繁的创建和销毁,这里使用了 Entry 的对象池,创建的时候从缓存中获取,销毁时回收。

下面看下 Entry 的创建过程

Entry 创建

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    ..... 

使用 Entry.newInstance 方法进行创建 Entry 对象。

static final class Entry {
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        @Override
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);
        }
    };

    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }

Entry.newInstance() 方法收下从 Recycler 中获取 Entry 对象,如果获取不到则使用 Recycler.newObject() 方法创建一个 Entry 对象,调用 newObject() 方法在 Recycler.get() 方法中,代码如下:

public abstract class Recycler<T> {
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

从 Recycler 中可以看出,Entry 对象是存储在 Stack 中。如果 Stack 中没有可用的 Stack,则调用 newObject() 方法创建。

Entry 对象回收

public boolean remove() {
    Entry e = flushedEntry;
    ...
    e.recycle();

    return true;
}

当 Entry 从链表中移除的时候回调用 e.recycle() 方法。

static final class Entry {
    void recycle() {
        next = null;
        bufs = null;
        buf = null;
        msg = null;
        promise = null;
        progress = 0;
        total = 0;
        pendingSize = 0;
        count = -1;
        cancelled = false;
        handle.recycle(this);
    }

Entry.recycle() 方法会把 Entry 中成员变量全部初始化。然后在调用 handle.recycle() 方法。

public abstract class Recycler<T> {
    static final class DefaultHandle<T> implements Handle<T> {
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }

            stack.push(this);
        }
    }

Recycler.recycle() 方法又把 Entry 对象压进 stack 中。



评论区
Rick ©2018