Netty4.x write和flush源码分析

Netty NIO中最重要的两个方法write和flush,write是把数据写入到OutboundBuffer(不真正发送数据),flush是真正的发送数据。Netty还提供另外一个方法writeAndFlush就是将write和flush方法合起来。

write方法源码分析

  • Channel以NioSocketChannel为例,
  • NioSocketChannel继承关系入下图所示,在调用流程图上我们用边上的类名表示抽象类或者内部类

1. AbstractChannel.write

客户端首先调用了write方法,pipeline默认是实现是DefaultChannelPipeline

1
2
3
4
5
6
public void sendMsg(ByteBuf msg){
future.channel().write(msg);
}
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}

2. DefaultChannelPipeline.write方法

接着看DefaultChannelPipeline的write方法,最后一个handler写入

1
2
3
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}

3. 接着看AbstractChannelHandlerContext. write 方法

1
2
3
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}

3. AbstractChannelHandlerContext.write 方法

最后 在中AbstractChannelHandlerContext会调用WriteTask.newInstance(next, m, promise),生成一个task任务,这个任务主要是异步写入数据到OutboundBuffer中,源码在下面的分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}

try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

4. AbstractWriteTask中run方法

run方法调用write,然后write中调用外部类AbstractChannelHandlerContext的最终处理方法invokeWrite0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final void run() {
try {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
}
}

protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}

private void invokeWrite(Object msg, ChannelPromise promise) {
if (isAdded()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

5. 内部类DefaultChannelPipeline.HeadContext

invokeWrite0方法调用了DefaultChannelPipeline中内部类HeadContext的write方法,然后write方法调用了最“苦逼” unsafe.write方法。unsafe定义是AbstractChannel内部类AbstractUnsafe。最后在outboundBuffer中加入了msg数。在后面分析flush时AbstractUnsafe也是干了最后的”苦力”。

  • DefaultChannelPipeline.HeadContext

    1
    2
    3
    4
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
    }
  • AbstractChannel.AbstractUnsafe

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
    safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
    // release message now to prevent resource-leak
    ReferenceCountUtil.release(msg);
    return;
    }

    int size;
    try {
    msg = filterOutboundMessage(msg);
    size = pipeline.estimatorHandle().size(msg);
    if (size < 0) {
    size = 0;
    }
    } catch (Throwable t) {
    safeSetFailure(promise, t);
    ReferenceCountUtil.release(msg);
    return;
    }
    outboundBuffer.addMessage(msg, size, promise);
    }

6. 调用流程图如下所示

nett-write

flush方法源码分析

我们在分析write方式就提到,write只是把数据add到buffer里,并没有真正的发数据。flush方法是真正的发送数据。

1. AbstractChannel.flush

1
2
3
4
public Channel flush() {
pipeline.flush();
return this;
}

2. DefaultChannelPipeline.flush

在DefaultChannelPipeline.flush中调用了tail.flush,最后一个handler Flush

1
2
3
4
public final ChannelPipeline flush() {
tail.flush();
return this;
}

3. AbstractChannelHandlerContext

接下来看HandlerContext的flush方法,new一个异步任务,任务里面执行invokeFlush方法,然后调用到invokeFlush0方法,最后调用handler.flush。这个handler是定义在DefaultChannelPipeline.HeadContext内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}
return this;
}

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}

private void invokeFlush() {
if (isAdded()) {
invokeFlush0();
} else {
flush();
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}

4. DefaultChannelPipeline.HeadContext.flush

1
2
3
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();

}

5. AbstractChannel.AbstractUnsafe

AbstractUnsafe的flush0方法调用了doWrite参数是outboundBuffer,也就是write方法中存放数据的buffer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}

protected void flush0() {
if (inFlush0) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}

inFlush0 = true;
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}

try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
close(voidPromise(), t, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}

6. AbstractNioByteChannel

unsafe调用的doWrite方法中,有两个分支doWriteBytes和doWriteFileRegion方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}

if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}

boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}

flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}

in.progress(flushedAmount);

if (done) {
in.remove();
} else {
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();

if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}

for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}

flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}

in.progress(flushedAmount);
}

if (done) {
in.remove();
} else {
break;
}
} else {
throw new Error();
}
}
incompleteWrite(setOpWrite);
}

这里我们分析doWriteBytes方法

7. NioSocketChannel

doWriteBytes定义在NioSocketChannel上,抽象方法abstract int doWriteBytes(ByteBuf buf) 定义在NioSocketChannel父类AbstractNioByteChannel上(NioSocketChannel继承图在第一节)。这里我们以UnpooledHeapByteBuf为例分析readBytes方法

1
2
3
4
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

8. UnpooledHeapByteBuf

readBytes里面调用了getBytes方法,getBytes里面调用了out.write方法,out是JDK原生的NIO Channel。可以参看SocketChannel接口,实现类是SocketChannelImpl。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {

checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
return readBytes;
}

private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
ensureAccessible();
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = ByteBuffer.wrap(array);
}
return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
}

8. flush调用流程图

netty-flush