Channel 与 ChannelPipeline
相信大家都知道了, 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:head---->handler----->tail通过上图我们可以看到, 一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表. 这个链表的头是 HeadContext, 链表的尾是 TailContext, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler.上面的图示给了我们一个对 ChannelPipeline 的直观认识, 但是实际上 Netty 实现的 Channel 是否真的是这样的呢? 我们继续用源码说话. 在第一章 Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 中, 我们已经知道了一个 Channel 的初始化的基本过程, 下面我们再回顾一下.下面的代码是 AbstractChannel 构造器: protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }在newChannelPipeline函数中把this也就是NioSocketChannel传过去
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }在channelPipeline里面持有一个channel对象
bstractChannel 有一个 pipeline 字段, 在构造器中会初始化它为 DefaultChannelPipeline的实例. 这里的代码就印证了一点: 每个 Channel 都有一个 ChannelPipeline.接着我们跟踪一下 DefaultChannelPipeline 的初始化过程.首先进入到 DefaultChannelPipeline 构造器中:protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);
head = new HeadContext(this);head.next = tail;
tail.prev = head; }在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中, 然后实例化两个 ChannelHandlerContext,
一个是 HeadContext 实例 head, 另一个是 TailContext 实例 tail. 接着将 head 和 tail 互相指向, 构成一个双向链表.特别注意到, 我们在开始的示意图中, head 和 tail 并没有包含 ChannelHandler, 这是因为 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了, 因此它们有 Context 和 Handler 的双重属性.headContext和tailContext是ChannelPipeline的内部类
headContext和tailContext不仅都继承了AbstractChannelHandlerContext还都实现了ChannelHandler接口final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandlerChannelInBoundHandler或者ChannelOutboundHandler就是继承了ChannelHandler接口
public interface ChannelInboundHandler extends ChannelHandlerpublic interface ChannelOutboundHandler extends ChannelHandler接着看一下 HeadContext 的构造器:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }接着调用父类AbstractChannelHandlerContext的构造函数 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }ChannelInitializer 的添加
上面一小节中, 我们已经分析了 Channel 的组成, 其中我们了解到, 最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler), 但是这个 Pipeline并不能实现什么特殊的功能, 因为我们还没有给它添加自定义的 ChannelHandler.通常来说, 我们在初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例吧:
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });上面代码的初始化过程, 相信大家都不陌生. 在调用 handler 时, 传入了 ChannelInitializer 对象,
它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢? 下面我们就来揭开它的神秘面纱.ChannelInitializer 实现了 ChannelHandler,
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapterpublic class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
那么它是在什么时候添加到 ChannelPipeline 中的呢?
进行了一番搜索后, 我们发现它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的. final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); }ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }return regFuture;
}init方法里面传入的是NioSocketChannel
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler());final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) { setChannelOptions(channel, options, logger); }final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }从NioSocketChannel里面拿到pipeline
ChannelPipeline p = channel.pipeline();p.addLast(config.handler());config.handler()返回的就是bootstrap.handler()其实就是 ChannelInitializer
config是bootstrap的成员变量,而且吧bootstrap传给他做参数
private final BootstrapConfig config = new BootstrapConfig(this);addLast方法要一步一步的跟踪,进入DefaultChannelPipeline
public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); }在进入重载函数
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); }for (ChannelHandler h: handlers) {
if (h == null) { break; } addLast(executor, null, h); }return this;
}在进入重载函数
newContext是AbstractChannelHandlerContext对象,把handler也就是ChannelInitializer作为参数传入进去
在newCtx的时候把handler也就是ChannelInitializer封装在AbstractChannelHandlerContext里面public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; }EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }在 addLast0函数中进行添加
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }有朋友可能就有疑惑了, 我明明插入的是一个 ChannelInitializer 实例,
为什么在 ChannelPipeline 中的双向链表中的元素却是一个 ChannelHandlerContext? 为了解答这个问题, 我们继续在代码中寻找答案吧.我们刚才提到, 在 Bootstrap.init 中会调用 p.addLast() 方法, 将 ChannelInitializer 插入到链表末端,而 ChannelInitializer是封装在AbstractChannelHandlerContext里面的可以清楚地看到, ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口,
因此这里实例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.不就是 inbound 和 outbound 两个字段嘛, 为什么需要这么大费周章地分析一番? 其实这两个字段关系到 pipeline 的事件的流向与分类, 因此是十分关键的, 不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起的作用. 在这里, 读者只需要记住, ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 即可.当创建好 Context 后, 就将这个 Context 插入到 Pipeline 的双向链表中
------------------------------------------------------------------
自定义 ChannelHandler 的添加过程
我们已经分析了一个 ChannelInitializer 如何插入到 Pipeline 中的, 接下来就来探讨一下 ChannelInitializer 在哪里被调用,
ChannelInitializer 的作用, 以及我们自定义的 ChannelHandler 是如何插入到 Pipeline 中的.首先在 AbstractBootstrap.initAndRegister中, 通过 group().register(channel), 调用 MultithreadEventLoopGroup.register 方法
MultithreadEventLoopGroup是NioEventloopGroup的父类public ChannelFuture register(Channel channel) {
return next().register(channel); }next返回的是NioeventLoop,NioEventLoop是SingleThreadEventLoop, regiser方法在SingleThreadEventLoop里面
在MultithreadEventLoopGroup.register 中, 通过 next() 获取一个可用的 SingleThreadEventLoop, 然后调用它的 register在 SingleThreadEventLoop.register 中, 通过 channel.unsafe().register(this, promise) 来获取 channel 的 unsafe() 底层操作对象,
然后调用它的 register. public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } promise.channel().unsafe().register(this, promise); unsafe()是在NioSocketChannel里面创建出来的NioSocketChannelUnsafe register方法是AbstractUnsafe, AbstractUnsafe是NioSockketChannelUnsafe的曾祖父 参数eventLoop是nioEventLoop, promise是上面new DefaultChannelPromise 里面持有channelpublic final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }在 AbstractUnsafe.register 方法中, 调用 register0 方法注册 Channel
在 AbstractUnsafe.register0 中, 调用 AbstractNioChannel#doRegister 方法
AbstractNioChannel.doRegister 方法通过 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
---------------------------------------------------------------------------------------------------------
而我们自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中,
在这个方法中调用了 pipeline.fireChannelRegistered() 方法, 其实现如下:private void register0(ChannelPromise promise) {
try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } 调用的是DefaultChannelPipeline @Override public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }next.executor()返回的是channel里面的eventLoop,
@Override
public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } }public Channel channel() {
return pipeline.channel(); }eventLoop是在AbstractUnsafe.register函数里面赋值给AbstractChannel。this.eventLoop对象的
AbstractUnsafe是AbstractChannel的内部类可以调用外部类AbstractUnsafe.register(EventLoop eventLoop, ChannelPromise promise)
还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
并且它没有重写 fireChannelRegistered 方法, 因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:上面的代码很简单, 就是调用了 head.fireChannelRegistered() 方法而已.
关于上面代码的 head.fireXXX 的调用形式, 是 Netty 中 Pipeline 传递事件的常用方式, 我们以后会经常看到.
还记得 head 的 类层次结构图不, head 是一个 AbstractChannelHandlerContext 实例,
并且它没有重写 fireChannelRegistered 方法, 因此 head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered:static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } }executor返回的就是channel里面的NioEventLoop
public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } }我们已经强调过了, 每个 ChannelHandler 都与一个 ChannelHandlerContext 关联,
我们可以通过 ChannelHandlerContext 获取到对应的 ChannelHandler. 因此很显然了, 这里 handler() 返回的, 其实就是 head 对象, 并接着调用了 head.channelRegistered 方法.private void invokeChannelRegistered() {
if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } } ctx是head对象 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }调用head的 fireChannelRegistered 函数 public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; }很显然, 这个代码会从 head 开始遍历 Pipeline 的双向链表,
然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 想起来了没? 我们在前面分析 ChannelInitializer 时, 花了大量的笔墨来分析了 inbound 和 outbound 属性, 你看现在这里就用上了. 回想一下, ChannelInitializer 实现了 ChannelInboudHandler, 因此它所对应的 ChannelHandlerContext 的 inbound 属性就是 true, 因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext. 即:返回的就是DefaultChannelHandlerContext
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }接着调用DefaultChannelHandlerContext的 invokeChannelRegistered, handler 返回的就是ChannelInitializer
private void invokeChannelRegistered() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }调用 ChannelInitializer 的channelRegistered 函数
参数 ctx 是 this也就是 DefaultChannelHandlerContext @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove // the handler. if (initChannel(ctx)) { // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not // miss an event. ctx.pipeline().fireChannelRegistered(); } else { // Called initChannel(...) before which is the expected behavior, so just forward the event. ctx.fireChannelRegistered(); } }@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }initChannel 这个方法我们很熟悉了吧, 它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了,