Netty - 主Reactor执行主流程

最后更新:2019-01-15

ServerBootstrap在注册Channel时,会通过AbstractChannel#register方法注册channel

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
		...
    }
}

此时我们的线程是Main线程,所以它会执行eventLoop#execute方法来注册Channel,而这个方法的实现是在SingleThreadEventExecutor。

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
		...
    }
}

private void startThread() {
	if (state == ST_NOT_STARTED) {
		if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
			...
			doStartThread();
			...
		}
	}
}

private void doStartThread() {
	...
	executor.execute(new Runnable() {
		@Override
		public void run() {
			...
			try {
				// 重要的地方
				SingleThreadEventExecutor.this.run();
				success = true;
			} catch (Throwable t) {
				logger.warn("Unexpected exception from an event executor: ", t);
			} finally {
				...
			}
		}
	});
}

可以看到注册Channel的时候,SingleThreadEventExecutor才正式启动了线程池,当我们进入NioEventLoop run()方法后,可以发现它是一个无限循环,没有任何退出条件。

@Override
protected void run() {
	int selectCnt = 0;
	for (;;) {
		try {
			int strategy;
			try {
				strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
				switch (strategy) {
				case SelectStrategy.CONTINUE:
					continue;
				case SelectStrategy.BUSY_WAIT:
				case SelectStrategy.SELECT: // 轮询 I/O 事件
					long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
					if (curDeadlineNanos == -1L) {
						curDeadlineNanos = NONE; // nothing on the calendar
					}
					nextWakeupNanos.set(curDeadlineNanos);
					try {
						if (!hasTasks()) {
							strategy = select(curDeadlineNanos);
						}
					} finally {
						nextWakeupNanos.lazySet(AWAKE);
					}
					// fall through
				default:
				}
			} catch (IOException e) {
				...
			}

			selectCnt++;
			cancelledKeys = 0;
			needsToSelectAgain = false;
			final int ioRatio = this.ioRatio;
			boolean ranTasks;
			if (ioRatio == 100) {
				try {
					if (strategy > 0) {
						// 处理 I/O 事件
						processSelectedKeys();
					}
				} finally {
					// 处理所有任务
					ranTasks = runAllTasks();
				}
			} else if (strategy > 0) {
				final long ioStartTime = System.nanoTime();
				try {
					// 处理 I/O 事件
					processSelectedKeys();
				} finally {
					// // 处理完 I/O 事件,再处理异步任务队列
					final long ioTime = System.nanoTime() - ioStartTime;
					ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
				}
			} else {
				ranTasks = runAllTasks(0); // This will run the minimum number of tasks
			}

			...
		} catch (e) {
			...
		} finally {
			...
		}
	}
}

上述的代码在不间断循环执行以下三件事情,

  • 轮询 I/O 事件(select):轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件。

  • 处理 I/O 事件(processSelectedKeys):处理已经准备就绪的 I/O 事件。

  • 处理异步任务队列(runAllTasks):Reactor 线程还有一个非常重要的职责,就是处理任务队列中的非 I/O 任务。Netty 提供了 ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

轮询 I/O 事件

switch (strategy) {
case SelectStrategy.CONTINUE:
	continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT: // 轮询 I/O 事件
	long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
	if (curDeadlineNanos == -1L) {
		curDeadlineNanos = NONE; // nothing on the calendar
	}
	nextWakeupNanos.set(curDeadlineNanos);
	try {
		if (!hasTasks()) {
			strategy = select(curDeadlineNanos);
		}
	} finally {
		// This update is just to help block unnecessary selector wakeups
        // so use of lazySet is ok (no race condition)
		nextWakeupNanos.lazySet(AWAKE);
	}
	// fall through
default:
}

NioEventLoop 通过核心方法 select() 不断轮询注册的 I/O 事件。当没有 I/O 事件产生时,为了避免 NioEventLoop 线程一直循环空转,在获取 I/O 事件或者异步任务时需要阻塞线程,等待 I/O 事件就绪或者异步任务产生后才唤醒线程。NioEventLoop通过nextWakeupNanos来判断唤醒逻辑

// nextWakeupNanos is:
//    AWAKE            when EL is awake
//    NONE             when EL is waiting with no wakeup scheduled
//    other value T    when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

Netty 提供了选择策略 SelectStrategy 对象,它用于控制 select 循环行为,包含 CONTINUE、SELECT、BUSY_WAIT 三种策略,因为 NIO 并不支持 BUSY_WAIT,所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的。

SelectStrategy是如何判断的?

final class DefaultSelectStrategy implements SelectStrategy {
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

// NioEventLoop 
private final IntSupplier selectNowSupplier = new IntSupplier() {
	public int get() throws Exception {
		return NioEventLoop.this.selectNow();
	}
};

int selectNow() throws IOException {
	return this.selector.selectNow();
}

如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。如果存在就绪的 I/O 事件,那么会走到 default 分支后直接跳出,然后执行 I/O 事件处理 processSelectedKeys 和异步任务队列处理 runAllTasks 的逻辑。所以在存在异步任务的场景,NioEventLoop 会优先保证 CPU 能够及时处理异步任务。

当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略, 就会调用select方法

private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    // 判断是否超过5微妙,如果超过说明当前任务队列中有定时任务需要立刻执行,执行延时select,否则就执行selectNow()
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

处理 I/O 事件

通过 select 过程我们已经获取到准备就绪的 I/O 事件,接下来就需要调用 processSelectedKeys() 方法处理 I/O 事件。在开始处理 I/O 事件之前,Netty 通过 ioRatio 参数控制 I/O 事件处理和任务处理的时间比例,默认为 ioRatio = 50。如果 ioRatio = 100,表示每次都处理完 I/O 事件后,会执行所有的 task。如果 ioRatio < 100,也会优先处理完 I/O 事件,再处理异步任务队列。所以不论如何 processSelectedKeys() 都是先执行的:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

处理 I/O 事件时有两种选择,一种是处理 Netty 优化过的 selectedKeys,另外一种是正常的处理逻辑。根据是否设置了 selectedKeys 来判断使用哪种策略,这两种策略使用的 selectedKeys 集合是不一样的。Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型。

processSelectedKeysPlain

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
	...
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        // SelectionKey 上面可以挂载 attachment
        final Object a = k.attachment();
        i.remove();
		//根据 attachment 属性可以判断 SelectionKey 的类型,AbstractNioChannel 类型由 Netty 框架负责处理
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            // NioTask 是用户自定义的 task
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
		...
    }
}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 检查 Key 是否合法
    if (!k.isValid()) {
        final EventLoop eventLoop;
		...
		
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            // key不合法,关闭连接
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 处理连接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 处理可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 处理可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
  • OP_CONNECT 连接建立事件:表示 TCP 连接建立成功, Channel 处于 Active 状态。处理 OP_CONNECT 事件首先将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件,然后调用 unsafe.finishConnect() 方法通知上层连接已经建立。最终会调用的 pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,然后会在 Pipeline 中进行传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功。
  • OP_WRITE,可写事件:表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作。
  • OP_READ,可读事件:表示 Channel 收到了可以被读取的新数据。Netty 将 READ 和 Accept 事件进行了统一的封装,都通过 unsafe.read() 进行处理。unsafe.read() 的逻辑可以归纳为几个步骤:从 Channel 中读取数据并存储到分配的 ByteBuf;调用 pipeline.fireChannelRead() 方法产生 Inbound 事件,然后依次调用 ChannelHandler 的 channelRead() 方法处理数据;调用 pipeline.fireChannelReadComplete() 方法完成读操作;最终执行 removeReadOp() 清除 OP_READ 事件。

processSelectedKeysOptimized

processSelectedKeysOptimized 与 processSelectedKeysPlain 的代码结构非常相似,其中最重要的一点就是 selectedKeys 的遍历方式是不同的,

因为 SelectedSelectionKeySet 内部使用的是 SelectionKey 数组,所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件,相比 JDK HashSet 的遍历效率更高。SelectedSelectionKeySet 内部通过 size 变量记录数据的逻辑长度,每次执行 add 操作时,会把对象添加到 SelectionKey[] 尾部。当 size 等于 SelectionKey[] 的真实长度时,SelectionKey[] 会进行扩容。相比于 HashSet,SelectionKey[] 不需要考虑哈希冲突的问题,所以可以实现 O(1) 时间复杂度的 add 操作。

这个SelectedSelectionKeySet是NioEventLoop#openSelector 方法通过反射的方式,将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet

处理异步任务队列

NioEventLoop 内部有两个非常重要的异步任务队列,分别为普通任务队列和定时任务队列。NioEventLoop 提供了 execute() 和 schedule() 方法用于向不同的队列中添加任务,execute() 用于添加普通任务,schedule() 方法用于添加定时任务。

private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    // 将任务添加到了 taskQueue
    addTask(task);
	...

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

在任务处理的场景下,inEventLoop() 始终是返回 true,始终都是在 Reactor 线程内串行执行。处理异步任务队列有 runAllTasks() 和 runAllTasks(long timeoutNanos) 两种实现,第一种会处理所有任务,第二种是带有超时时间来处理任务。之所以设置超时时间是为了防止 Reactor 线程处理任务时间过长而导致 I/O 事件阻塞,

protected boolean runAllTasks(long timeoutNanos) {
	// 合并定时任务到普通任务队列
    fetchFromScheduledTaskQueue();
    // 从普通任务队列中取出任务并处理
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
	// 计算任务处理的超时时间
    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
    	// 执行任务
        safeExecute(task);

        runTasks ++;

        // 每执行 64 个任务检查一下是否超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
		// 继续取出下一个任务
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
	// 收尾工作,把尾部队列 tailTasks 里的任务以此取出执行一遍
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

异步任务处理 runAllTasks 的过程可以分为三步:合并定时任务到普通任务队列,然后从普通任务队列中取出任务并处理,最后进行收尾工作

如果想对 Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等,都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

Edgar

Edgar
一个略懂Java的小菜比