EpollEventLoopGroup 与 NioEventLoopGroup你知道吗
Java NIO根据操作系统不同,比如 macosx 是KQueueSelectorProvider、windows有WindowsSelectorProvider、Linux有EPollSelectorProvider (Linux kernels >= 2.6,是epoll模式)或PollSelectorProvider(selector模式), 足以可见不同的系统对nio中的Selector有不同的实现,自4.0.16起,Netty为Linux通过JNI的方式提供了native socket transport。
Oracle jdk会自动选择合适的Selector,Oracle JDK在Linux已经默认使用epoll方式, 为什么netty还要提供一个基于epoll的实现呢?
stackoverflow也解释过,具体可参阅官方native-transports,Tomcat Native
If you are running on linux you can use EpollEventLoopGroup and so get better performance, less GC and have more advanced features that are only available on linux.
- Netty的 epoll transport使用 epoll edge-triggered 而 java的 nio 使用 level-triggered
- Netty的 epoll transport 暴露了更多的nio没有的配置参数, 如 TCP_CORK, SO_REUSEADDR等。
- C代码,更少GC,更少synchronized
总之,linux上使用EpollEventLoopGroup会有较少的gc有更高级的特性,性能更好~!
那该如何使用native socket transport(epoll)呢?
其实只需将相应的类替换即可
NioEventLoopGroup | EpollEventLoopGroup |
NioEventLoop | EpollEventLoop |
NioServerSocketChannel | EpollServerSocketChannel |
NioSocketChannel | EpollSocketChannel |
很多优秀的源码中对此都进行了兼容性处理,比如rocketmq
//源于org.apache.rocketmq.remoting.netty
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
}
怎么读懂这块的源码?
首先从EventExecutorGroup开始,EventExecutorGroup是NioEventLoopGroup最上层的接口,它继承了java.util.concurrent.ScheduledExecutorService接口,因此它可以调度执行task。
- EventExecutorGroup内部管理了n个EventExecutor,next()方法返回其中的一个
- EventExecutor也是EventExecutorGroup(的子类)
EventExecutorGroup就像一个BOSS,每当有活儿的时候,就派一个小弟(EventExecutor)去干。
MultithreadEventExecutorGroup的每一个小弟都是一个SingleThreadEventExecutor,而且小弟的数量在构造的时候就确定了,这个BOSS的小弟分配逻辑相当简单,无非就是轮流使唤(next方法),这个BOSS的每一个小弟都是一个NioEventLoop。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
}
//实现了ScheduledExecutorService接口
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
public Future<?> submit(Runnable task) {
return this.next().submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return this.next().submit(task);
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren;
private final Promise<?> terminationFuture;
private final EventExecutorChooser chooser;
public EventExecutor next() {
return this.chooser.next();
}
}
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
//调度逻辑,这里调用了Math.abs()方法以防止executors溢出
public EventExecutor next() {
return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
}
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
public EventLoop next() {
return (EventLoop)super.next();
}
protected abstract EventLoop newChild(Executor var1, Object... var2) throws Exception;
public ChannelFuture register(Channel channel) {
return this.next().register(channel);
}
public ChannelFuture register(ChannelPromise promise) {
return this.next().register(promise);
}
}
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public void rebuildSelectors() {
Iterator var1 = this.iterator();
while(var1.hasNext()) {
EventExecutor e = (EventExecutor)var1.next();
((NioEventLoop)e).rebuildSelector();
}
}
}
- NioEventLoopGroup实际上就是个线程池
- NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件
- 每一个NioEventLoop负责处理m个Channel,可以理解为nio中Selector,它有两个职责:一是作为IO线程处理IO事件,二是作为普通的线程处理通过execute等方法提交上来的任务
- NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
NioEventLoop在单线程里同时处理IO事件和其他任务(单线程模式下的Reactor),NioEventLoop尽量(但不能保证)按照给定的比率(默认为50%)来分配花在这两种事情上的时间。换句话说,我们不应该在NioEventLoop里执行耗时的操作(比如数据库操作),这样会卡死NioEventLoop,降低程序的响应性。
更多推荐
所有评论(0)