1. OOM
- 1、当netty服务端并发压力过大,超过了服务端的处理能力时,channel中的消息服务端不能及时消费,这时channel堵塞,客户端消息就会堆积在发送队列中
- 2、网络瓶颈,当客户端发送速度超过网络链路处理能力,会导致客户端发送队列积压
- 3、当对端读取速度小于己方发送速度,导致自身TCP发送缓冲区满,频繁发生write 0字节时,待发送消息会在netty发送队列中排队
在我们调用channel的write和writeAndFlush时 io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise),如果发送方为业务线程,则将发送操作封装成WriteTask(继承Runnable),放到Netty的NioEventLoop中执行,当NioEventLoop无法完成如此多的消息的发送的时候,发送任务队列积压,进而导致内存泄漏。
io.netty.channel.ChannelConfig#setWriteBufferHighWaterMark 高水位 io.netty.channel.ChannelConfig#setWriteBufferLowWaterMark 低水位
2. netty的消息发送机制
业务调用write方法后,经过ChannelPipeline职责链处理,消息被投递到发送缓冲区待发送,调用flush之后会执行真正的发送操作,底层通过调用Java NIO的SocketChannel进行非阻塞write操作,将消息发送到网络上,
private void write(Object msg, boolean flush, ChannelPromise promise) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
// See https://github.com/netty/netty/issues/8343.
Netty的NioEventLoop线程内部维护了一个Queue<Runnable> taskQuue
void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
数据会在 Pipeline 中一直寻找 Outbound 节点并向前传播,直到 Head 节点结束,由 Head 节点完成最后的数据发送。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
public void flush(ChannelHandlerContext ctx) {
Head 节点是通过调用 unsafe 对象完成数据写入的,unsafe 对应的是 NioSocketChannelUnsafe 对象实例,最终调用到 AbstractChannel 中的 write 方法
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
try {
} finally {
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
} catch (Throwable t) {
try {
} finally {
safeSetFailure(promise, t);
outboundBuffer.addMessage(msg, size, promise);
- filterOutboundMessage 方法会对待写入的 msg 进行过滤,如果 msg 使用的不是 DirectByteBuf,那么它会将 msg 转换成 DirectByteBuf。
- ChannelOutboundBuffer 可以理解为一个缓存结构,从源码最后一行 outboundBuffer.addMessage 可以看出是在向这个缓存中添加数据,所以 ChannelOutboundBuffer 才是理解数据发送的关键。
writeAndFlush 主要分为两个步骤,write 和 flush。通过上面的分析可以看出只调用 write 方法,数据并不会被真正发送出去,而是存储在 ChannelOutboundBuffer 的缓存内。
ChannelOutboundBuffer是一个链表结构,它包含三个非常重要的指针:第一个被写到缓冲区的节点 flushedEntry、第一个未被写到缓冲区的节点 unflushedEntry和最后一个节点 tailEntry。
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
// The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
private Entry tailEntry;
// The number of flushed entries that are not written yet
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
incrementPendingOutboundBytes(entry.pendingSize, false);
在初始状态下这三个指针都指向 NULL,当我们每次调用 write 方法是,都会调用 addMessage 方法改变这三个指针的指向,可以参考下图理解指针的移动过程会更加形象。
第一次调用 write,因为链表里只有一个数据,所以 unflushedEntry 和 tailEntry 指针都指向第一个添加的数据 msg1。flushedEntry 指针在没有触发 flush 动作时会一直指向 NULL。
第二次调用 write,tailEntry 指针会指向新加入的 msg2,unflushedEntry 保持不变。
第 N 次调用 write,tailEntry 指针会不断指向新加入的 msgN,unflushedEntry 依然保持不变,unflushedEntry 和 tailEntry 指针之间的数据都是未写入 Socket 缓冲区的。
以上便是写 Buffer 队列写入数据的实现原理,但是我们不可能一直向缓存中写入数据,所以 addMessage 方法中每次写入数据后都会调用 incrementPendingOutboundBytes 方法判断缓存的水位线
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 判断缓存大小是否超过高水位线
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
incrementPendingOutboundBytes 的逻辑非常简单,每次添加数据时都会累加数据的字节数,然后判断缓存大小是否超过所设置的高水位线 64KB,如果超过了高水位,那么 Channel 会被设置为不可写状态。直到缓存的数据大小低于低水位线 32KB 以后,Channel 才恢复成可写状态。
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
// 减去待发送的数据,如果总字节数低于低水位,那么 Channel 将变为可写状态
decrementPendingOutboundBytes(pending, false, true);
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
addFlush 方法同样也会操作 ChannelOutboundBuffer 缓存数据。在执行 addFlush 方法时,缓存中的指针变化又是如何呢?如下图所示,我们在写入流程的基础上继续进行分析。
此时 flushedEntry 指针有所改变,变更为 unflushedEntry 指针所指向的数据,然后 unflushedEntry 指针指向 NULL,flushedEntry 指针指向的数据才会被真正发送到 Socket 缓冲区。
在 addFlush 源码中 decrementPendingOutboundBytes 与之前 addMessage 源码中的 incrementPendingOutboundBytes 是相对应的。decrementPendingOutboundBytes 主要作用是减去待发送的数据字节,如果缓存的大小已经小于低水位,那么 Channel 会恢复为可写状态。
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
3. 水位用法
Netty中提供 了writeBufferLowWaterMark和writeBufferHighWaterMark选项用来控制高低水位。可以通过监控当前写缓冲区的水位状况,来避免占用大量的内存,因为ChannelOutboundBuffer本身是无界的,
childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024));
if (ctx.channel().isWritable()) {
} else {
// TODO,记录日志,消息可能会被丢弃
4. 参考资料
