Java NIO根据操作系统不同,比如 macosx 是KQueueSelectorProvider、windows有WindowsSelectorProvider、Linux有EPollSelectorProvider Linux kernels >= 2.6,是epoll模式)或PollSelectorProvider(selector模式), 足以可见不同的系统对nio中的Selector有不同的实现,自4.0.16起,Netty为Linux通过JNI的方式提供了native socket transport。

Oracle jdk会自动选择合适的Selector,Oracle JDK在Linux已经默认使用epoll方式, 为什么netty还要提供一个基于epoll的实现呢?

stackoverflow也解释过,具体可参阅官方native-transportsTomcat Native

If you are running on linux you can use EpollEventLoopGroup and so get better performance, less GC and have more advanced features that are only available on linux.

  • Netty的 epoll transport使用 epoll edge-triggered 而 java的 nio 使用 level-triggered
  • Netty的 epoll transport 暴露了更多的nio没有的配置参数, 如 TCP_CORK, SO_REUSEADDR等。
  • C代码,更少GC,更少synchronized

总之,linux上使用EpollEventLoopGroup会有较少的gc有更高级的特性,性能更好~!

那该如何使用native socket transport(epoll)呢?

其实只需将相应的类替换即可

NioEventLoopGroupEpollEventLoopGroup
NioEventLoop EpollEventLoop
NioServerSocketChannel EpollServerSocketChannel
NioSocketChannel EpollSocketChannel

很多优秀的源码中对此都进行了兼容性处理,比如rocketmq

//源于org.apache.rocketmq.remoting.netty
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }

        loadSslContext();
    }
    private boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform()
            && nettyServerConfig.isUseEpollNativeSelector()
            && Epoll.isAvailable();
    }
}

怎么读懂这块的源码?

首先从EventExecutorGroup开始,EventExecutorGroup是NioEventLoopGroup最上层的接口,它继承了java.util.concurrent.ScheduledExecutorService接口,因此它可以调度执行task。

  • EventExecutorGroup内部管理了n个EventExecutor,next()方法返回其中的一个
  • EventExecutor也是EventExecutorGroup(的子类)

 

EventExecutorGroup就像一个BOSS,每当有活儿的时候,就派一个小弟(EventExecutor)去干。

MultithreadEventExecutorGroup的每一个小弟都是一个SingleThreadEventExecutor,而且小弟的数量在构造的时候就确定了,这个BOSS的小弟分配逻辑相当简单,无非就是轮流使唤(next方法),这个BOSS的每一个小弟都是一个NioEventLoop。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
}
//实现了ScheduledExecutorService接口
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    public Future<?> submit(Runnable task) {
        return this.next().submit(task);
    }
    public <T> Future<T> submit(Callable<T> task) {
        return this.next().submit(task);
    }
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger terminatedChildren;
    private final Promise<?> terminationFuture;
    private final EventExecutorChooser chooser;

    public EventExecutor next() {
        return this.chooser.next();
    }
}
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        //调度逻辑,这里调用了Math.abs()方法以防止executors溢出
        public EventExecutor next() {
            return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
        }
    }
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    public EventLoop next() {
        return (EventLoop)super.next();
    }

    protected abstract EventLoop newChild(Executor var1, Object... var2) throws Exception;

    public ChannelFuture register(Channel channel) {
        return this.next().register(channel);
    }

    public ChannelFuture register(ChannelPromise promise) {
        return this.next().register(promise);
    }
}
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public void rebuildSelectors() {
        Iterator var1 = this.iterator();

        while(var1.hasNext()) {
            EventExecutor e = (EventExecutor)var1.next();
            ((NioEventLoop)e).rebuildSelector();
        }

    }
}

  • NioEventLoopGroup实际上就是个线程池
  • NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件
  • 每一个NioEventLoop负责处理m个Channel,可以理解为nio中Selector,它有两个职责:一是作为IO线程处理IO事件,二是作为普通的线程处理通过execute等方法提交上来的任务
  • NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel

NioEventLoop在单线程里同时处理IO事件和其他任务(单线程模式下的Reactor),NioEventLoop尽量(但不能保证)按照给定的比率(默认为50%)来分配花在这两种事情上的时间。换句话说,我们不应该在NioEventLoop里执行耗时的操作(比如数据库操作),这样会卡死NioEventLoop,降低程序的响应性。

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐