线程池ThreadPoolExcutor详解
Java线程池概述
线程池技术在并发时经常会使⽤到,java中的线程池的使⽤是通过调⽤ThreadPoolExecutor来实现的。ThreadPoolExecutor提供了四个构造函数,最后都会归结于下⾯这个构造⽅法:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @param handler the handler to use when execution is blocked
*        because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
*        {@code corePoolSize < 0}<br>
*        {@code keepAliveTime < 0}<br>
*        {@code maximumPoolSize <= 0}<br>
*        {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*        or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
这些参数的意义如下:
corePoolSize:该线程池中核⼼线程数最⼤值
maximumPoolSize:该线程池中线程总数最⼤值
keepAliveTime:该线程池中⾮核⼼线程闲置超时时长
unit:keepAliveTime的单位
workQueue:阻塞队列BlockingQueue,维护着等待执⾏的Runnable对象
threadFactory:创建线程的接⼝,需要实现他的Thread newThread(Runnable r)⽅法。
RejectedExecutionHandler:饱和策略,最⼤线程和⼯作队列容量且已经饱和时execute⽅法都将调⽤RejectedExecutionHandler ThreadPoolExecutor⼯作流程
⼤致过程陈述为:
1. 向线程池中添加任务,当任务数量少于corePoolSize时,会⾃动创建thead来处理这些任务;
2. 当添加任务数⼤于corePoolSize且少于maximmPoolSize时,不在创建线程,⽽是将这些任务放到阻塞队列中,等待被执⾏;
3. 接上⾯2的条件,且当阻塞队列满了之后,继续创建thread,从⽽加速处理阻塞队列;
4. 当添加任务⼤于maximmPoolSize时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是AbortPolicy(直接丢弃)。
线程池中使⽤的阻塞队列
ArrayBlockingQueue:基于数组结构的有界阻塞队列,构造函数⼀定要传⼤⼩,FIFO(先进先出);
LinkedBlockingQueue:⽆界,默认⼤⼩65536(Integer.MAX_VALUE),当⼤量请求任务时,容易造成内存耗尽。SynchronousQueue:同步队列,是⼀个特殊的BlockingQueue,它没有容量(这是因为在SynchronousQueue中,插⼊将等待另⼀个线程的删除操作,反之亦然)。具体可以参考:《Java SynchronousQueue Examples(译)》
PriorityBlockingQueue: 优先队列,⽆界。DelayedWorkQueue:这个队列接收到任务时,⾸先先⼊队,只有达到了指定的延时时间,才会执⾏任务
阻塞队列常见的⽅法如下表所⽰:
常见四种线程池
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
newScheduledThreadPool
它们通过Executors以静态⽅法的⽅式直接调⽤,实质上是它们最终调⽤的是ThreadPoolExecutor的构造⽅法,也就是本⽂最前⾯那段代码。
注:KeepAliveTime=0的话,表⽰不等待
摘⾃阿⾥巴巴开发⼿册:
【强制】线程池不允许使⽤ Executors 去创建,⽽是通过 ThreadPoolExecutor 的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积⼤量的请求,从⽽导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建⼤量的线程,从⽽导致OOM。
应⽤样例,更多请参看
package multiThread;
import urrent.ArrayBlockingQueue;
import urrent.ThreadPoolExecutor;
import urrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
1, 2, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy());
System.out.println("getQueue:" + Queue().size());
System.out.println("remainingCapacity:" + Queue().remainingCapacity());
try {
int count = 0;
Thread.currentThread().setName("aa");
while (count <= 10) {
System.out.println(Thread.currentThread().getName() + "getQueue:" + Queue().size());
System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
java线程池创建的四种Thread.sleep(1000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
}
});
try {
int count = 0;
Thread.currentThread().setName("bbb");
while (count <= 100) {
System.out.println(Thread.currentThread().getName() + "getQueue:" + Queue().size());                    System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
Thread.sleep(1000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。