Categories
Uncategorized

Netty source code analysis (c) —– start the server source code analysis

This article is concerned then the previous two articles, mainly about the rest of the server class, we take a look at the server-side code

/**
 * Created by chenhao on 2019/9/4.
 */
public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new SimpleServerInitializer())
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

In two previous blog post analysis of the following lines of code from the perspective of the main source of what had been done.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new SimpleServerHandler())
        .childHandler(new SimpleServerInitializer())
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);

This blog from the source point of view of internal ChannelFuture f = b.bind (8888) .sync () implementation. This completes the source code analysis Netty server startup process.

Source Analysis ChannelFuture f = b.bind (8888) .sync ()

AbstractBootstrap.java

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

We then look at the overloaded bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();//

Check the relevant parameters

if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);//

The analysis below

}

This function mainly depends on two things: validate () and doBind (localAddress)

validate () method

//

Function: check the relevant parameters are set

@SuppressWarnings("unchecked") public B validate() { if (group == null) {//

herein refers to the group: b.group (bossGroup, workerGroup) code bossGroup

throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }

The main method to check two parameters, one group, one channelFactory, where you can think about these two parameters is where and when it was assigned? The answer is to be assigned in the following code block, which is assigned to the Group bossGroup, to assign the BootstrapChannelFactory channelFactory.

ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)

doBind (localAddress) Method

doBind method source code is as follows:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//1
    final Channel channel = regFuture.channel();//2
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise;
    if (regFuture.isDone()) {
        promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
    }

    return promise;
} 

doBind This function is the key to our analysis, the main work of this function are as follows:

1, () method to obtain a ChannelFuture regFuture instance by initAndRegister.

2, the method determines whether the abnormality is generated to when executed initAndRegister method regFuture.cause (). If an exception to, directly returns, If no abnormal, the step 3.

3, by regFuture.isDone () method to determine whether initAndRegister finished, finished to return if true, then call for doBind0 socket binding. If not finished false returns to step 4.

4, regFuture will add a ChannelFutureListener listener, when initAndRegister execution is complete, call operationComplete method and perform doBind0 be socket binding.

3, 4, wanted to do something that is a point: calling doBind0 method socket binding.

It will be divided into four parts for each line of code specifically what had been done for detailed analysis.

initAndRegister()

In particular code of the method is as follows:

final ChannelFuture initAndRegister() {
    //

Conclusion: channel here is a NioServerSocketChannel objects, detailed analysis see later

final Channel channel = channelFactory().newChannel();//1 try { init(channel);//2 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

By internal function name and function call can guess the function did two things:

1, initiates a Channel, in order to initialize, certainly must first obtain a Channel.

final Channel channel = channelFactory().newChannel();//1
init(channel);//2

2, the Channel register.

ChannelFuture regFuture = group().register(channel);//3

Below we will analyze what these few lines of code inside to dry.

final Channel channel = channelFactory().newChannel();

In the previous article (Netty source code analysis (B) —– ServerBootstrap) analysis, we know that b.channel (NioServerSocketChannel.class) functions are: to set the parent class property channelFactory: Object BootstrapChannelFactory class. Wherein this object comprises a BootstrapChannelFactory clazz property: NioServerSocketChannel.class

Therefore, final Channel channel = channelFactory () newChannel (); BootstrapChannelFactory class is invoked in newChannel () method, the specific content of the process is:

private static final class BootstrapChannelFactoryextends Channel> implements ChannelFactory {
    private final Classextends T> clazz;

    BootstrapChannelFactory(Classextends T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

See this class, we can conclusion: final Channel channel = channelFactory () newChannel (); this role to a line of code is generated by an instance of the class NioServerSocketChannel reflection.

NioServerSocketChannel constructor

Constructors NioServerSocketChannel will look like what had been done.

Inheritance architecture NioServerSocketChannel class as follows:

 

 

Which no reference constructor as follows:

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

No reference constructor SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider ().

NewSocket function features: the use of a SocketChannelImpl SelectorProvider generating object.

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
} 

public SocketChannel openSocketChannel() throws IOException {
    return new SocketChannelImpl(this);
}

No argument constructor creates a SocketChannelImpl function object via newSocket

Then call the following constructor, we continue to see

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
} 
//

AbstractNioMessageChannel parent class constructor

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } //

AbstractNioChannel parent class constructor

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT try { ch.configureBlocking(false);//

Set the current non-blocking ServerSocketChannel

} catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //

AbstractChannel parent class constructor

protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }

When the new NioServerSocketChannel () generates an instance of an object, so many call the above constructor is primarily done two things:

1, to produce a SocketChannelImpl instance of the class, set the ch attribute, and set to non-blocking.

this.ch = ch;
ch.configureBlocking(false);

2, provided config properties

config = new NioServerSocketChannelConfig(this, javaChannel().socket()

3, set SelectionKey.OP_ACCEPT event

this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT

4, set properties unsafe

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

The main role: responsible for the underlying connect, register, read and write operations.

5, pipeline set properties

pipeline = new DefaultChannelPipeline(this);

Each Channel has its own pipeline, when a request event occurs, pipeline responsible for calling the appropriate hander for processing.

These properties will be used later, as the unsafe NioServerSocketChannel object, to achieve specific properties pipeline later analysis.

Conclusion: final Channel channel = channelFactory () newChannel (); This action line is to instantiate a class NioServerSocketChannel by reflections, in which the NioServerSocketChannel class object has several attributes:. SocketChannel, NioServerSocketChannelConfig, SelectionKey.OP_ACCEPT event, NioMessageUnsafe, DefaultChannelPipeline

init(channel)

Init method specific code as follows:

@Override
void init(Channel channel) throws Exception {
    //

1, set the option of a new access channel

final Map, Object> options = options(); synchronized (options) { channel.config().setOptions(options);//NioServerSocketChannelConfig } //

2, the new access channel is provided attr

final Map, Object> attrs = attrs(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } //

3, is provided to the pipeline handler

ChannelPipeline p = channel.pipeline(); if (handler() != null) {//

Here handler () SimpleServerHandler second portion is returned .handler (new SimpleServerHandler ()) provided

p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //

p.addLast () added to the pipeline processor serverChannel a ServerBootstrapAcceptor, can be seen from the name, this is an access device, specifically to accept the new request, the new request to throw an event circulator

p.addLast(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }

Function of the function is:

1, set the channel’s options

If not set, the options is empty, the class attributes are defined in the following ServerBootstrap

Map, Object> options = new LinkedHashMap, Object>();

options might look like this:

public  boolean setOption(ChannelOption option, T value) {
    validate(option, value);

    if (option == CONNECT_TIMEOUT_MILLIS) {
        setConnectTimeoutMillis((Integer) value);
    } else if (option == MAX_MESSAGES_PER_READ) {
        setMaxMessagesPerRead((Integer) value);
    } else if (option == WRITE_SPIN_COUNT) {
        setWriteSpinCount((Integer) value);
    } else if (option == ALLOCATOR) {
        setAllocator((ByteBufAllocator) value);
    } else if (option == RCVBUF_ALLOCATOR) {
        setRecvByteBufAllocator((RecvByteBufAllocator) value);
    } else if (option == AUTO_READ) {
        setAutoRead((Boolean) value);
    } else if (option == AUTO_CLOSE) {
        setAutoClose((Boolean) value);
    } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
        setWriteBufferHighWaterMark((Integer) value);
    } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
        setWriteBufferLowWaterMark((Integer) value);
    } else if (option == MESSAGE_SIZE_ESTIMATOR) {
        setMessageSizeEstimator((MessageSizeEstimator) value);
    } else {
        return false;
    }

    return true;
}

2, the channel setting attrs

If not set, the attrs is empty, the class attributes are defined in the following ServerBootstrap

private final Map, Object> attrs = new LinkedHashMap, Object>();

3, the pipeline channel is provided to the handler

Wherein, handler is here: the analysis provided by b.handler (new SimpleServerHandler ()) in the post (Netty source analysis (B) of ServerBootstrap —–) of the object SimpleServerHandler

4, in the pipeline to add a ChannelInitializer objects, wherein the method to rewrite initChannel. The method p.addLast () added to the pipeline processor serverChannel a ServerBootstrapAcceptor, can be seen from the name, this is an access device, specifically to accept the new request, the new request to throw an event loop device

See here, we find that to initialize init just some basic configuration and properties, as well as adding an access point on the pipeline, one designed to accept new connections, and did not start the service.

group().register(channel)

Back initAndRegister method, continue to look config (). Group (). Register (channel) this line of code, config method returns ServerBootstrapConfig, this ServerBootstrapConfig a group calling method, in fact bossGroup. bossGroup call register method.

The previous analysis we know that group is: NioEvenLoopGroup, its successor MultithreadEventLoopGroup, register methods in this class are as follows:

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);//

Call register method NioEvenLoop object, NioEventLoop extends SingleThreadEventLoop

}

Code next () method below, whose function is to select the next NioEventLoop object.

@Override
public EventExecutor next() {
    return chooser.next();//

The next method call MultithreadEventExecutorGroup

}

According to the number of threads nThreads whether a power of 2 to select the chooser, where the two chooser is: PowerOfTwoEventExecutorChooser, GenericEventExecutorChooser, both chooser features are the same, but the remainder are not the same.

next () method returns the object is a NioEvenLoop

private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        return children[childIndex.getAndIncrement() & children.length - 1];//

Method 2 using N times characteristics, using modulo & faster.

} } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }

Conclusion: Due to NioEventLoopGroup in maintaining multiple NioEventLoop, next chooser policy back to the calling method to find the next NioEventLoop, register and execute the method of the object to be registered.

Since NioEventLoop extends SingleThreadEventLoop, NioEventLoop not override this method, so we see the register method SingleThreadEventLoop class

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

In the first post portion 1 NioServerSocketChannel instantiation unsafe setting properties, particularly to calling a method in the set, so channel.unsafe where () is NioMessageUnsafe instance.

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

channel.unsafe () .register (this, promise) This line of code calls the register method in the AbstracTunSafe class, as follows:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    //

Determine whether the channel has already been registered in the EventLoop

if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //

1 is provided on the eventLoop NioServerSocketChannel

AbstractChannel.this.eventLoop = eventLoop; //

Determine whether the current thread has a thread for the EventLoop, if so, to register directly, if not, add a task to the thread

if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { //

Emphasis

@Override public void run() { register0(promise);//

analysis

} }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }

The above emphasis is register0 (promise) method. The basic logic is:

1, judged by calling eventLoop.inEventLoop () method whether the current thread has a thread for the EventLoop, if so, to register directly, if not, indicating that the EventLoop waiting and has no enforcement power, the second step.

AbstractEventExecutor.java

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

SingleThreadEventExecutor.java

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
} 

2, since the time of the EventLoop threads in this case has no enforcement power, but we can submit a job to the thread, such as the EventLoop threads have the right to perform naturally will perform this task, and the task is responsible for calling register0 method so it will achieve the purpose of calling register0 method.

See below register0 this method, the specific code is as follows:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        doRegister();
        registered = true;
        safeSetSuccess(promise);
        //

Executed, the console output: channelRegistered

pipeline.fireChannelRegistered(); if (isActive()) { //

analysis

pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }

In the above code, the method is complete the registration NioServerSocketChannel by calling the doRegister (), the specific code of the method is as follows:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
} 

protected SelectableChannel javaChannel() {
    return ch;
} 

In the example analysis NioServerSocketChannel portion of a post, we know that an instance SocketChannelImpl class javaChannel where () is generated when instantiated NioServerSocketChannel the method returns ch, and is set to non-blocking, particularly see Bowen part 1.

selectionKey = javaChannel () register (eventLoop () selector, 0, this.);. ServerSocketChannel registration is completed to the Selector.

Under review, eventLoop here (). Selector What is? The answer is: KQueueSelectorImpl object.

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}

private Selector openSelector() {
    final Selector selector;
    try {
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    //

... omitted part of the code

return selector; }

After completing registration ServerSocketChannel, then execute pipeline.fireChannelRegistered method.

public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
    return this;
}

We see that the parameter passed by InvokechannelRegistered (this.head) is head, which we’ll talk about in the next article, continue to look down.

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }

}

Look at Next.InvokechannelRegistered ();

private void invokeChannelRegistered() {
    if (this.invokeHandler()) {
        try {
            ((ChannelInboundHandler)this.handler()).channelRegistered(this);
        } catch (Throwable var2) {
            this.notifyHandlerException(var2);
        }
    } else {
        this.fireChannelRegistered();
    }

}

Then take a look at this.handler (), it is actually the head of the handler ()

public ChannelHandler handler() {
    return this;
}

Returns this, that the head then look channelRegistered (this)

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
    ctx.fireChannelRegistered();
}

Continue to look ctx.fireChannelRegistered ();

public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(this.findContextInbound());
    return this;
}

We look this.findContextInbound ()

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;

    do {
        ctx = ctx.next;
    } while(!ctx.inbound);

    return ctx;
}

We see ctx = ctx.next; actually start looking from the head, to find the first inbound hander

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }

}

The last execution next.invokeChannelRegistered ();

maintained in the pipeline handler list, remember before .handler (new SimpleServerHandler ()) describes this initialization handler is added to this handler in the pipeline section 1.2 in the analysis of the present Bowen by traversing the list, the type of execution InBound handler method of channelRegistered

Therefore, the implementation here, we went back to the console output: channelRegistered, this line item.

Here, we will doBind method final ChannelFuture regFuture = initAndRegister (); to analyze finished, the conclusion is as follows:

1, is generated by reflecting a NioServerSocketChannle object.

2, the initialization is completed

3, NioServerSocketChannel been registered.

Next, we analyze the remaining part of the code doBind main method of doing something,

Source code is as follows:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//1
    final Channel channel = regFuture.channel();//2
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise;
    if (regFuture.isDone()) {
        promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
    }

    return promise;
} 

doBind0(regFuture, channel, localAddress, promise);

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

The main function is to submit a Runnable task to NioEventLoop thread for processing. Here take a look at the execute method NioEventLoop class

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();//

Determine whether the current thread for thread NioEventLoop associated, and if so, add the task to the task queue, and if not, to start the thread, and then add the task to the task queue to go

if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); //

in case

if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }

When the task is submitted by the thread execution will execute channel.bind (localAddress, promise) .addListener (ChannelFutureListener.CLOSE_ON_FAILURE) this line, this line of code completion functions are: to achieve binding channel and the port.

details as follows:

AbstractChannel.java    

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

In this method, the method is called directly bind the pipeline, when the pipeline DefaultChannelPipeline examples herein.

DefaultChannelPipeline.java 

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

In the above method directly call the bind method TailContext example tail, tail in the next blog post detailed description. Continue to look at examples of tail bind method

AbstractChannelHandlerContext.java   

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    //

... omitted validity check

final AbstractChannelHandlerContext next = findContextOutbound();// EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }

This function bind above this line of code: final AbstractChannelHandlerContext next = findContextOutbound (); task is completed in the pipeline held in AbstractChannelHandlerContext doubly linked list of nodes from the start node tail end of a first outbound forward looking = true the handler node.

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

In DefaultChannelPipeline the constructor will instantiate two objects: head and tail, and a head and tail formed doubly linked list. HeadContext head is an example, and it implements the interface and ChannelInboundHandler ChannelOutboundHandler interfaces, and its outbound field is true. TailContext is an example of the tail, ChannelInboundHandler interfaces it implements and which outbound field is false, inbound field is true. Based on this method called findContextOutbound in a bind as a function of the object is actually found AbstractChannelHandlerContext head.

Continue to look to find the doubly linked list in the first pipelie outbound = true after the AbstractChannelHandlerContext head node, and then calls the node invokeConnect method, the code of the method are as follows:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

handler HeadContext class () method code is as follows:

@Override
public ChannelHandler handler() {
    return this;
}

This method returns itself, because HeadContext due to their inherited AbstractChannelHandlerContext and implements the interface ChannelHandler to have Context Handler and double features.

Continue to look to see HeadContext class bind method, as follows:

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

unsafe This field is initialized in the HeadContext constructor, as follows:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

And this constructor pipeline.channel () unsafe () This line of code is returned instance constructors NioServerSocketChannel a study of this class is initialized earlier in the post, as follows:

unsafe = newUnsafe();//

newUnsafe () method returns the object NioMessageUnsafe.

Next look bind method NioMessageUnsafe class (precisely: the method AbstractUnsafe), the method specifically bind class code as follows:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        //

... omitted part of the code

boolean wasActive = isActive(); try { doBind(localAddress);//

Core code

} catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }

The core code above is: DOBind (LocalAddress); it should be noted that this DOBind method is in the NiOSERVersocketChannel class, not in the other class.

doBind NioServerSocketChannel class method code as follows:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

In the above process javaChannel () method returns the Java NIO ServerSocketChannel instance is initialized NioServerSocketChannel instance generated (more specifically point ServerSocketChannelImple instance). Is equivalent to the statement serverSocketChannel.socket (). Bind (localAddress) completed the designated port binding, so you start listening to this port. After the success of the port binding, is here called the channelActive our custom handler method, prior to binding, isActive () method returns false, returns true after binding.

@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}

This way, it enters the code block of if condition as follows

if (!wasActive && isActive()) {
    invokeLater(new OneTimeTask() {
        @Override
        public void run() {
            pipeline.fireChannelActive();
        }
    });
}    

private void invokeLater(Runnable task) {
    try {
            //

Omit part of the code

eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }

Then start executing Pipeline.FireChannelActive (); this line of code, the specific call chain for this line is as follows:

DefaultChannelPipeline.java

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

@Override
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
    return this;
}

private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

to sum up

Finally, we do under the summary, netty start a service through which flow 1. Set the startup class parameters, the most important is to set channel2. Create a server corresponding channel, create major components, including ChannelConfig, ChannelId, ChannelPipeline, ChannelHandler, Unsafe and other 3.init initialize the NioServerSocketChannel, set some attr, option, and set the sub-channel of attr, option, channel server to add a new channel access control, and event trigger addHandler

4.config () .group () .register (channel) uses ServerBootstrap’s BossGroup to get NioEventLoop based on the group length, registering NioserVersocketChannel with on selector in NioEvenentLoop and then triggers the ChannelRegistered event

5. call to jdk underlying do port binding and trigger active event

 

Leave a Reply