Netty NIO中最重要的两个方法write和flush,write是把数据写入到OutboundBuffer(不真正发送数据),flush是真正的发送数据。Netty还提供另外一个方法writeAndFlush就是将write和flush方法合起来。
write方法源码分析
- Channel以NioSocketChannel为例,
- NioSocketChannel继承关系入下图所示,在调用流程图上我们用边上的类名表示抽象类或者内部类
1. AbstractChannel.write
客户端首先调用了write方法,pipeline默认是实现是DefaultChannelPipeline1
2
3
4
5
6public void sendMsg(ByteBuf msg){
future.channel().write(msg);
}
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
2. DefaultChannelPipeline.write方法
接着看DefaultChannelPipeline的write方法,最后一个handler写入1
2
3public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
3. 接着看AbstractChannelHandlerContext. write 方法
1 | public ChannelFuture write(Object msg) { |
3. AbstractChannelHandlerContext.write 方法
最后 在中AbstractChannelHandlerContext会调用WriteTask.newInstance(next, m, promise),生成一个task任务,这个任务主要是异步写入数据到OutboundBuffer中,源码在下面的分析。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
4. AbstractWriteTask中run方法
run方法调用write,然后write中调用外部类AbstractChannelHandlerContext的最终处理方法invokeWrite01
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public final void run() {
try {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
}
}
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (isAdded()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
5. 内部类DefaultChannelPipeline.HeadContext
invokeWrite0方法调用了DefaultChannelPipeline中内部类HeadContext的write方法,然后write方法调用了最“苦逼” unsafe.write方法。unsafe定义是AbstractChannel内部类AbstractUnsafe。最后在outboundBuffer中加入了msg数。在后面分析flush时AbstractUnsafe也是干了最后的”苦力”。
DefaultChannelPipeline.HeadContext
1
2
3
4@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}AbstractChannel.AbstractUnsafe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
6. 调用流程图如下所示
flush方法源码分析
我们在分析write方式就提到,write只是把数据add到buffer里,并没有真正的发数据。flush方法是真正的发送数据。
1. AbstractChannel.flush
1 | public Channel flush() { |
2. DefaultChannelPipeline.flush
在DefaultChannelPipeline.flush中调用了tail.flush,最后一个handler Flush1
2
3
4public final ChannelPipeline flush() {
tail.flush();
return this;
}
3. AbstractChannelHandlerContext
接下来看HandlerContext的flush方法,new一个异步任务,任务里面执行invokeFlush方法,然后调用到invokeFlush0方法,最后调用handler.flush。这个handler是定义在DefaultChannelPipeline.HeadContext内部类。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}
return this;
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
private void invokeFlush() {
if (isAdded()) {
invokeFlush0();
} else {
flush();
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
4. DefaultChannelPipeline.HeadContext.flush
1 | public void flush(ChannelHandlerContext ctx) throws Exception { |
5. AbstractChannel.AbstractUnsafe
AbstractUnsafe的flush0方法调用了doWrite参数是outboundBuffer,也就是write方法中存放数据的buffer。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
protected void flush0() {
if (inFlush0) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
close(voidPromise(), t, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
6. AbstractNioByteChannel
unsafe调用的doWrite方法中,有两个分支doWriteBytes和doWriteFileRegion方法,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
break;
}
} else {
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
这里我们分析doWriteBytes方法
7. NioSocketChannel
doWriteBytes定义在NioSocketChannel上,抽象方法abstract int doWriteBytes(ByteBuf buf) 定义在NioSocketChannel父类AbstractNioByteChannel上(NioSocketChannel继承图在第一节)。这里我们以UnpooledHeapByteBuf为例分析readBytes方法1
2
3
4protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
8. UnpooledHeapByteBuf
readBytes里面调用了getBytes方法,getBytes里面调用了out.write方法,out是JDK原生的NIO Channel。可以参看SocketChannel接口,实现类是SocketChannelImpl。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18@Override
public int readBytes(GatheringByteChannel out, int length) throws IOException {
checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
return readBytes;
}
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
ensureAccessible();
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = ByteBuffer.wrap(array);
}
return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));
}