⼀个公式看懂:为什么DUBBO线程池会打满
欢迎⼤家关注「JAVA前线」查看更多精彩分享⽂章,主要包括源码分析、实际应⽤、架构思维、职场分享、产品思考等等,欢迎⼤家加我「java_front」⼀起交流学习
0 ⽂章概述
⼤家可能都遇到过DUBBO线程池打满这个问题,刚开始遇到这个问题可能会⽐较慌,常见⽅案可能就是重启服务,但也不知道重启是否可以解决。我认为重启不仅不能解决问题,甚⾄有可能加剧问题,这是为什么呢?本⽂我们就⼀起分析DUBBO线程池打满这个问题。
1 基础知识
1.1 DUBBO线程模型
1.1.1 基本概念
DUBBO底层⽹络通信采⽤Netty框架,我们编写⼀个Netty服务端进⾏观察:
public class NettyServer {
public static void main(String[] args)throws Exception {
EventLoopGroup bossGroup =new NioEventLoopGroup(1);
EventLoopGroup workerGroup =new NioEventLoopGroup(8);
try{
ServerBootstrap bootstrap =new ServerBootstrap();
.
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch)throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7777).sync();
System.out.println("服务端准备就绪");
channelFuture.channel().closeFuture().sync();
}catch(Exception ex){
System.out.Message());
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
BossGroup线程组只有⼀个线程处理客户端连接请求,连接完成后将完成三次握⼿的SocketChannel连接分发给WorkerGroup处理读写请求,这两个线程组被称为「IO线程」。
我们再引出「业务线程」这个概念。服务⽣产者接收到请求后,如果处理逻辑可以快速处理完成,那么可以直接放在IO线程处理,从⽽减少线程池调度与上下⽂切换。但是如果处理逻辑⾮常耗时,或者会发起新IO请求例如查询数据库,那么必须派发到业务线程池处理。
DUBBO提供了多种线程模型,选择线程模型需要在配置⽂件指定dispatcher属性:
<dubbo:protocol name="dubbo" dispatcher="all"/>
<dubbo:protocol name="dubbo" dispatcher="direct"/>
<dubbo:protocol name="dubbo" dispatcher="message"/>
<dubbo:protocol name="dubbo" dispatcher="execution"/>
<dubbo:protocol name="dubbo" dispatcher="connection"/>
不同线程模型在选择是使⽤IO线程还是业务线程,DUBBO官⽹⽂档说明如下:
all
所有消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,⼼跳
direct
所有消息都不派发到业务线程池,全部在IO线程直接执⾏
message
只有请求响应消息派发到业务线程池,其它连接断开事件,⼼跳等消息直接在IO线程执⾏
execution
只有请求消息派发到业务线程池,响应和其它连接断开事件,⼼跳等消息直接在IO线程执⾏
connection
在IO线程上将连接断开事件放⼊队列,有序逐个执⾏,其它消息派发到业务线程池
1.1.2 确定时机
⽣产者和消费者在初始化时会确定线程模型:
// ⽣产者
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler)throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
// 消费者
public class NettyClient extends AbstractClient {
public NettyClient(final URL url,final ChannelHandler handler)throws RemotingException {
super(url,wrapChannelHandler(url, handler));
}
}
⽣产者和消费者默认线程模型都是AllDispatcher,ChannelHandlers.wrap⽅法可以获取Dispatch⾃适应扩展点。如果我们在配置⽂件中指定dispatcher,扩展点加载器会从URL获取属性值加载对应线程模型。本⽂以⽣产者为例进⾏分析:
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler)throws RemotingException {
// ChannelHandlers.wrap确定线程策略
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url){
return new MultiMessageHandler(new ExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(ha ndler, url)));
}
}
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY,"channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
1.1.3 源码分析
我们分析其中两个线程模型源码,其它线程模型请阅读DUBBO源码。AllDispatcher模型所有消息都派
发到业务线程池,包括请求,响应,连接事件,断开事件,⼼跳:
public class AllDispatcher implements Dispatcher {
// 线程模型名称
public static final String NAME ="all";
// 具体实现策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url){
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void connected(Channel channel)throws RemotingException {
// 连接完成事件交给业务线程池
ExecutorService cexecutor =getExecutorService();
try{
}catch(Throwable t){
}catch(Throwable t){
throw new ExecutionException("connect event", channel,getClass()+" error when process connected event", t);
}
}
@Override
public void disconnected(Channel channel)throws RemotingException {
// 断开连接事件交给业务线程池
ExecutorService cexecutor =getExecutorService();
try{
}catch(Throwable t){
throw new ExecutionException("disconnect event", channel,getClass()+" error when process disconnected event", t);
}
}
@Override
public void received(Channel channel, Object message)throws RemotingException {
// 请求响应事件交给业务线程池
ExecutorService cexecutor =getExecutorService();
try{
}catch(Throwable t){
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request =(Request)message;
if(request.isTwoWay()){
String msg ="Server side("+ Ip()+","+ Port()+") threadpool is exhausted ,detail msg:"+ t.getMessage(); Response response =new Id(), Version
());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel,getClass()+" error when process received event", t);
为什么使用bootstrap?}
}
@Override
public void caught(Channel channel, Throwable exception)throws RemotingException {
// 异常事件交给业务线程池
ExecutorService cexecutor =getExecutorService();
try{
}catch(Throwable t){
throw new ExecutionException("caught event", channel,getClass()+" error when process caught event", t);
}
}
}
DirectDispatcher策略所有消息都不派发到业务线程池,全部在IO线程直接执⾏:
public class DirectDispatcher implements Dispatcher {
// 线程模型名称
public static final String NAME ="direct";
// 具体实现策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url){
// 直接返回handler表⽰所有事件都交给IO线程处理
return handler;
}
}
1.2 DUBBO线程池策略
1.2.1 基本概念
上个章节分析了线程模型,我们知道不同的线程模型会选择使⽤还是IO线程还是业务线程。如果使⽤业务线程池,那么使⽤什么线程池策略是本章节需要回答的问题。DUBBO官⽹线程派发模型图展⽰了线程模型和线程池策略的关系:
DUBBO提供了多种线程池策略,选择线程池策略需要在配置⽂件指定threadpool属性:
<dubbo:protocol name="dubbo" threadpool="fixed" threads="100"/>
<dubbo:protocol name="dubbo" threadpool="cached" threads="100"/>
<dubbo:protocol name="dubbo" threadpool="limited" threads="100"/>
<dubbo:protocol name="dubbo" threadpool="eager" threads="100"/>
不同线程池策略会创建不同特性的线程池:
fixed
包含固定个数线程
cached
线程空闲⼀分钟会被回收,当新请求到来时会创建新线程
limited
线程个数随着任务增加⽽增加,但不会超过最⼤阈值。空闲线程不会被回收
eager
当所有核⼼线程数都处于忙碌状态时,优先创建新线程执⾏任务,⽽不是⽴即放⼊队列
1.2.2 确定时机
本⽂以AllDispatcher为例分析线程池策略确定时机:
public class AllDispatcher implements Dispatcher {
public static final String NAME ="all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url){
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url){
super(handler, url);
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论