⼏种常见的线程池及使⽤场景
为什么要使⽤线程池?
创建线程和销毁线程的花销是⽐较⼤的,这些时间有可能⽐处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务⼯作线程,消耗系统资源的时间,可能导致系统资源不⾜。(我们可以把创建和销毁的线程的过程去掉)
线程池有什么作⽤?
1、提⾼效率 创建好⼀定数量的线程放在池中,等需要使⽤的时候就从池中拿⼀个,这要⽐需要的时候创建⼀个线程对象要快的多。
2、⽅便管理 可以编写线程池管理代码对池中的线程同⼀进⾏管理,⽐如说启动时有该程序创建100个线程,每当有请求的时候,就分配⼀个线程去⼯作,如果刚好并发有101个请求,那多出的这⼀个请求可以排队等候,避免因⽆休⽌的创建线程导致系统崩溃。
说说⼏种常见的线程池及使⽤场景
1、newSingleThreadExecutor
创建⼀个单线程化的线程池,它只会⽤唯⼀的⼯作线程来执⾏任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执⾏。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2、newFixedThreadPool
创建⼀个定长线程池,可控制线程最⼤并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、newCachedThreadPool
创建⼀个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若⽆可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4、newScheduledThreadPool
创建⼀个定长线程池,⽀持定时及周期性任务执⾏。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
线程池不允许使⽤Executors去创建,⽽是通过ThreadPoolExecutor的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的风险。 说明:Executors各个⽅法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费⾮常⼤的内存,甚⾄OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最⼤数是Integer.MAX_VALUE,可能会创建数量⾮常多的线程,甚⾄OOM。
Positive example 1:
//org.urrent.BasicThreadFactory
ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()
);
Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.shutdown();//gracefully shutdown
Positive example 3:
<bean id="userThreadPool"
class="org.urrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
个⼈在项⽬中⽤到的是第三种,业务需求,每天会有调度服务器会通过http协议请求
<bean id="xxDataThreadPool"
class="org.urrent.ThreadPoolTaskExecutor">
<!-- 核⼼线程数 -->
<property name="corePoolSize" value="50"/>
<!-- 最⼤线程数 -->
<property name="maxPoolSize" value="500"/>
<!-- 队列最⼤长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="10"/>
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="1"/>
<!-- 线程池对拒绝任务(⽆线程可⽤)的处理策略如果已经超过了限制丢弃消息,不进⾏处理 -->
<property name="rejectedExecutionHandler">
<bean class="urrent.ThreadPoolExecutor$DiscardPolicy"/>
</property>
</bean>
@Controller
@RequestMapping("/windData")
public class WindDataListener {
private final static ThLogger logger = Logger("WindDataDispatcher");
@Autowired
private ThreadPoolTaskExecutor controlerThreadPool;
@Autowired
private ThreadPoolTaskExecutor windDataThreadPool;
@Autowired
private WindDataRuntimeService runtimeService;
@Autowired
private MaintainAlarmSender maintainAlarmSender;
/**
* 启动调度
*/
@RequestMapping(value = "/receiveMsg", method = RequestMethod.GET)
java线程池创建的四种@ResponseBody
public void receiveMsg() {
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.HTTP_API;
logger.info("[{}][接收到调度消息]", paramLog);
//定时调度,可能有多个http请求,把请求都放在controlerThreadPool⾥⾯
logger.info("[{}][响应给调度系统]", paramLog);
}
}
private ThreadPoolTaskExecutor taskThreadPool;
private WindDataRuntimeService runtimeService;
private MaintainAlarmSender maintainAlarmSender;
private Map<Object, Object> mdcMap;
public WindDataDispatcher(ThreadPoolTaskExecutor taskThreadPool, WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, this.taskThreadPool = taskThreadPool;
this.runtimeService = runtimeService;
this.maintainAlarmSender = maintainAlarmSender;
this.mdcMap = mdcMap;
}
@Override
public void run() {
if (null != mdcMap) {
MDC.setContextMap(mdcMap);
}
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.DISPATCHER;
logger.info("[{}启动]", paramLog);
logger.info("[{}结束]", paramLog);
}
}
private WindDataRuntimeService runtimeService;
private MaintainAlarmSender maintainAlarmSender;
private Map<Object, Object> mdcMap;
public WindDataExecutor(WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map<Object, Object> mdcMap) { this.runtimeService = runtimeService;
this.maintainAlarmSender = maintainAlarmSender;
this.mdcMap = mdcMap;
}
@Override
public void run() {
if (null != mdcMap) {
MDC.setContextMap(mdcMap);
}
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.EXECUTOR;
logger.info("[{}启动]", paramLog);
try {
} catch (Exception e) {
<("[{}异常]{}", new Object[]{paramLog, e});
maintainAlarmSender.sendMail(MaintainAlarmSender.DEFAULT_MAIL_SUB, paramLog + "异常:" + e);
}
logger.info("[{}结束]", paramLog);
}
}
线程池都有哪⼏种⼯作队列
1、ArrayBlockingQueue
是⼀个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进⾏排序。
2、LinkedBlockingQueue
⼀个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要⾼于ArrayBlockingQueue。静态⼯⼚⽅法wFixedThreadPool()使⽤了这个队列
3、SynchronousQueue
⼀个不存储元素的阻塞队列。每个插⼊操作必须等到另⼀个线程调⽤移除操作,否则插⼊操作⼀直处于阻塞状态,吞吐量通常要⾼于LinkedBlockingQueue,静态⼯⼚⽅法wCachedThreadPool使⽤了这个队列。
4、PriorityBlockingQueue
⼀个具有优先级的⽆限阻塞队列。
线程池中的⼏种重要的参数及流程说明
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论