Categories
Uncategorized

Netty source code analysis (a) —– NioEventLoopGroup

Netty mentioned first to be brought to bear to support it is certainly highly concurrent threading model, when it comes to threading model would have to mention NioEventLoopGroup this thread pool, then into the topic.

Threading model

First look at some examples of the use of Netty

package com.wrh.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }
}

The following will analyze the first and second line, a look class constructor NioEventLoopGroup what passes. The rest will analyze in other blog post.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

From here you can see the code uses two thread pools bossGroup and workerGroup, then why do you need to define two thread pool? This is to say that the threading model of Netty.

 

 

Wherein a thread on the thread model is called Netty Reactor Model, as shown in FIG particular, mainReactor refers bossGroup, the thread pool client connection request on the side of the diagram and accept register is connected to the subReactor; on FIG. the subReactor of course refers to the workerGroup, handles read and write data on an established client channel; the figure is still a ThreadPool specific business logic thread pool, under normal circumstances can be reused subReactor, than my project this usage is in, but the official suggested still have to use a separate ThreadPool when dealing with some of the more time-consuming business.

NioEventLoopGroup Constructor

Code constructor as follows NioEventLoopGroup

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, selectorProvider);
} 

Parent class constructors MultithreadEventLoopGroup following class constructor NioEventLoopGroup ultimately call:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

To create the object constructor above can be obtained if a EventLoopGroup workerGroup = new NioEventLoopGroup () from that do not specify the number of threads, the netty to us using the default number of threads, if you specify the number of threads with our designated.

The default number of thread-related code is as follows:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

The function SystemPropertyUtil.getInt function is: to give the system properties specified key (here: key = “io.netty.eventLoopThreads”) corresponding to the value, if the acquisition failure not obtain a default value is returned, the default value here: 2 times the number of cpu core.

Conclusion: If you do not set the program startup parameters (or do not specify key = “io.netty.eventLoopThreads” property value), the default number of threads in the case is the cpu core multiplied by 2.

Continue to look at, because the constructor MultithreadEventLoopGroup is called is the constructor of the parent class MultithreadEventExecutorGroup, therefore, look at this class constructor

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }

    children = new SingleThreadEventExecutor[nThreads];
    //

According to the number of thread is a power of 2, using different strategies initialization chooser

if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //

NTreads produce a NioEventLoop object is stored in an array of children

for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //

If newChild method fails, the successful implementation of several new NioEventLoop be shutdown process of the front

if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } }

The constructor did the following three things:

1, generates a thread factory: threadFactory = newDefaultThreadFactory ();

MultithreadEventExecutorGroup.java
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());//

getClass () is: NioEventLoopGroup.class

} DefaultThreadFactory.java public DefaultThreadFactory(Class poolType) { this(poolType, false, Thread.NORM_PRIORITY); }

2, according to the number of thread is a power of 2, using different strategies initialization chooser

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

3, resulting in a nTreads children NioEventLoop object is stored in an array, the thread is by calling newChild method produced.

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

Here NioEventLoop passed to the constructor parameters: NioEventLoopGroup, DefaultThreadFactory, SelectorProvider.

NioEventLoop constructor analysis

Since the above-mentioned NioEventLoop to a new target, here we take a look at this class and its parent class.

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}

Continue to look at the constructor of the parent class SingleThreadEventLoop

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}

But also directly call the constructor of the parent class SingleThreadEventExecutor, continue to look

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }

    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;//false

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
            //

Calls the run method of class NioEventLoop

SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); taskQueue = newTaskQueue(); } protected Queue newTaskQueue() { return new LinkedBlockingQueue(); }

The main dry the following two things:

1, the use of ThreadFactory to create a Thread, passing a Runnable object, which is run Runnable rewrite the code is longer, but the focus is only run NioEventLoop class method call.

2, using the class initialization LinkedBlockingQueue taskQueue.

Wherein the code newThread method are as follows:

DefaultThreadFactory.java

@Override
public Thread newThread(Runnable r) {
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());

    try {
    //

Determining whether a daemon thread, and set

if (t.isDaemon()) { if (!daemon) { t.setDaemon(false); } } else { if (daemon) { t.setDaemon(true); } } //

Set its priority

if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(r, name); }

FastThreadLocalThread.java

public FastThreadLocalThread(Runnable target, String name) {
    super(target, name);// FastThreadLocalThread extends Thread 
} 

Here, you can see the bottom or by means of a similar Thread thread = new Thread (r) in this way to create a thread.

NioEventLoop point on the object can be obtained are, for the initialization of the following four properties.

1, NioEventLoopGroup (SingleThreadEventExecutor in the parent class)

2、selector

3、provider

4, thread (parent class in SingleThreadEventExecutor)

to sum up

About NioEventLoopGroup, are summarized as follows

1, if you do not specify the number of threads, the number of threads: number of CPU cores * 2

2, according to the number of thread is a power of 2, using different strategies initialization chooser

3, a generating nThreads children NioEventLoop object is stored in an array.

Can understand NioEventLoop is a thread, the thread NioEventLoop which has the following attributes:

1, NioEventLoopGroup (SingleThreadEventExecutor in the parent class)

2、selector

3、provider

4, thread (parent class in SingleThreadEventExecutor)

More prosaically: NioEventLoopGroup is a thread pool, NioEventLoop is a thread. NioEventLoopGroup has N NioEventLoop thread pool threads.

 

Leave a Reply