线程池如何复⽤⼀个线程--ThreadPoolExecutor的实现(未完)任务是⼀组逻辑⼯作单元,⽽线程则是使任务异步执⾏的机制。在Java中,Runnable对象代表⼀个任务,Thread对象负责创建⼀个线程执⾏这个任务。
前提:1. 程序需要处理⼤量任务
2. 任务的执⾏时间相对创建线程和销毁的时间较短
⽅法1:
while (ture) {
Socket connection = socket.accept();
handleTask(connection); //单线程处理所⽤任务
⽅法2:
while (true) {
final Socket connection = socket.accept();
new Thread(() -> {handleTask(connection);}).start(); 每个任务⼀个线程处理
}
两种⽅法的缺点:1. 串⾏执⾏的问题在于响应慢,在多核处理器上吞吐量⼩。
2. ⽆限创建线程导致资源消耗⼤,可创建线程数有限制,恶意提交任务会导致程序负载过⾼。
Java的第3种解决⽅案是线程池,它是兼顾资源和并发的处理⽅案。
线程池的关键是任务的执⾏策略,复⽤线程。
执⾏策略定义了:
~ 任务在哪个线程上运⾏
~ 任务按照什么顺序执⾏(FIFO, LIFO, 优先级)
~ 有多少个任务可以并发执⾏
~ 队列中有多少个任务等待执⾏
~ 系统由于过载⽽需要拒绝⼀个任务时,选择哪个任务,如何通过程序有任务被拒绝
~ 任务执⾏前后,应该进⾏哪些动作。
线程池的优势:
1. 重⽤线程,减少了创建和销毁线程的开销,在Window和Linux,Java使⽤⼀个线程对应⼀个轻量级进程的实现。
2. 某些情况下,任务到达时,如果有空闲线程,可以⽴即执⾏任务,⽽不需要等待创建新线程,提⾼响应速度。
3. 线程池的⼤⼩可以调节,以便处理器保持忙碌状态,提⾼效率。
4. 通过将任务的提交和执⾏分离,可以根据硬件资源选择最佳的执⾏策略。
线程池的实现:
在urrent包中,ThreadPoolExecutor是⼀个线程池实现。
图中的三个接⼝:
Executor:⼀个运⾏新任务的简单接⼝;public interface Executor {
/**
* 在未来的某个时间执⾏任务command. 这个任务可能在⼀个新的线程中执⾏,
* 或者在线程池中的线程执⾏,或者⼀个调⽤线程中执⾏(即在main中运⾏)。
* 取决于线程池的具体实现
* @param需要执⾏的任务
*/
void execute(Runnable command);
}
ExecutorService:扩展Executor接⼝
public interface ExecutorService extends Executor {
void shutdown(); // 停⽌接受任务。
List<Runnable> shutdownNow(); // 尝试停⽌所有活动的任务
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ScheduledExecutorService:扩展了ExecutorService。⽀持Future和定期执⾏任务。
ThreadPoolExceutor源码:
1. 关键field
// 线程池状态初始化为处于运⾏状态,线程数为0.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 储存运⾏状态和线程数(刚创建的线程处理RUNNING,0个活动线程。private static final int COUNT_BITS = Integer.SIZE - 3; // Integer.SIZE = 32 COUNT_BITS = 29
// 线程池的线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0x00011111 1111 1111 1111 1111 1111 1111
// 运⾏状态储存在⾼位中
private static final int RUNNING = -1 << COUNT_BITS; // 0x1110 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0x0000 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS; // 0x0010 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS; // 0x0100 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS; // 0x0110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取运⾏状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取运⾏状态和线程数
线程池的五种状态:
RUNNING: 运⾏状态,接受新提交的任务,并且可以处理队列中的任务。
SHUTDOWN: 关闭状态,不再接受新提交的任务,可以处理队列中的任务。在线程池处于
RUNNING状态时,调⽤shutdown()⽅法会使线程池进⼊到此状态。
STOP:停⽌状态,不再接受新提交的任务,也不处理队列中的任务,并且中断运⾏中的任务。
在RUNNING或SHUTDOWN状态时,调⽤shutdownNow()⽅法使线程池进⼊到该状态。
TIDYING:所有任务都⼰终⽌,workerCount为0, 转换到TIDYING状态的线程池将运⾏terminate()⽅法。
TERMINATE: 终⽌状态,terminated()⽅法调⽤后进⼊此状态。
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<>();
private volatile ThreadFactory threadFactory;
workQueue⽤于保存任务
workers⽤于保存⼯作线程
threadFactory⽤于⽣产线程。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = SecurityManager() == null ?
null : Context();
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize: 线程池核⼼线程数量,作⽤:
* 当新任务在execute()提交时,根据corePoolSize去判断任务如何执⾏:
-> 如果线程池中线程⼩于corePoolSize,创建新的线程处理此任务,即使线程池中有空闲线程
-> 如果线程池中线程数⼤于等于corePoolSize且workQueue未满时,任务添加到workQueue中。
-> 如果线程池中线程数⼤于等于corePoolSize且workQueue⼰满时,创建新线程处理任务。
-> 如果线程池中线程数⼤于等于maximunPoolSize,且workQueue已满时,通过指定的handler策略处理任务(有四种已定义的策略)maximumPoolSize:最⼤线程数量
workQueue:未处理的任务队列
keepAliveTime:当线程池中的线程数量⼤于corePoolSize的时候,多余的线程不会⽴即销毁,⽽是会等待,
直到等待的时间超过了 keepAliveTime
threadFactory:⽤于创建新线程,通过ThreadFactory可以
handler:当线程池的的线程数等于maximumPoolSize且workQueue⼰满时,使⽤handler处理新提交的线程
execute⽅法,实现执⾏任务的逻辑。
public void execute(Runnable command) {java线程池创建的四种
if (command == null)
throw new NullPointerException();
/*
* 分3步处理任务:
* 1.线程数⼩于corePoolSize时,创建新线程执⾏此任务。
* addWorker ⽅法⾃动检查runState和workerCount
* addWorker因为不能添加线程时,返回false
*
* 2. 如果可以添加到队列, 我们仍要再次检查线程池的状态
* (因为线程池中可能没有线程或者在进⼊此⽅法时,线程池被关闭了。
* 如果线程池不处于RUNNING,清除刚才添加的任务
* 如果处于RUNNING且workerCount=0,创建新线程。
*
* 3.如果不能将任务添加到队列, 就尝试创建⼀个新的线程。如果创建失败,拒绝任务
*/
int c = (); // 获取线程池的运⾏状态和线程数
if (workerCountOf(c) < corePoolSize) { // 如果线程数⼩于corePoolSize时
// 第⼆个参数表⽰,限制添加线程的数量由谁决定
// true 由corePoolSize
// false 由maximumPoolSize
if (addWorker(command, true))
return; // 添加成功
c = (); // 添加线程失败时,重新获取运⾏状态和线程数
}
// 线程池处于RUNNING状态且任务成功添加到workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = (); // 重新获取运⾏状态和线程数,
// 重新判断运⾏状态,如果不是处于运⾏状态,移除上⾯调⽤workQueue.offer(command)时,添加的任务。
// 并且拒绝此任务
i f (! isRunning(recheck) && remove(command))
reject(command);
// 如果处于运⾏状态,且线程数为0时
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 在线程中添加线程去执⾏此任务。
}
// 如果线程池不是运⾏状态或者添加任务失败,且创建线程的失败时,
else if (!addWorker(command, false))
reject(command); // 拒绝此任务
}
addWrok⽅法,实现增加线程的逻辑
检查是否可以根据当前线程池状态和给定边界(核⼼或最⼤线程数)添加新⼯作线程。如果是,则相
应地调整⼯作线程的计数,并且如果可能,创建并启动新⼯作程序,将firstTask作为其第⼀个任务运⾏。如果线程池已停⽌或可以关闭,则此⽅法返回false。如果线程⼯⼚在询问时⽆法创建线程,它也会返回false。如果线程创建失败,或者由于线程⼯⼚返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError)会回滚
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ();
int rs = runStateOf(c); //获取运⾏状态
// 运⾏状态为RUNNING时,可以执⾏任务,跳出if
// 处于SHUTDOWN状态,fisrtTask为null,workQueue不为空时,跳出if
if (rs >= SHUTDOWN && // 不处于RUNNING,继续判断,否则跳出if
!
(rs == SHUTDOWN && // 如果处于SHUTDOWN状态,继续判断,否则返回false
firstTask == null && // fisrtTask为空,继续判断,否则返回false
! workQueue.isEmpty())) // workQueue为空时,返回false,否则跳出if
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || // ⼯作线程数⼤于CAPACITY时,返回false
wc >= (core ? corePoolSize : maximumPoolSize)) // ⼩于CAPACITY,⼤于core或max时,返回false
return false;
if (compareAndIncrementWorkerCount(c)) // 如果添加线程成功,跳出第⼀个for循环
break retry;
c = (); // Re-rea
d ctl 添加失败,重新读取状态
if (runStateOf(c) != rs) // 不是RUNNING状态,重新跑到第⼀个for循环。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // ⽤firstTask创建⼀个新的Worker
final Thread t = w.thread; // Worker利⽤ThreadFactory创建⼀个线程对象储存在其实例thread中if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = ());
// 如果处于运⾏状态,或者处于关闭状态但任务为null时
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 判断线程是否处于已经调⽤start(),如果是,抛出异常
throw new IllegalThreadStateException();
workers.add(w); // 将⼯作线程添加到Hashset中
int s = workers.size();
if (s > largestPoolSize) // 记录线程池中出现过最⼤的线程数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程, t.start()调⽤的是Worker的run⽅法,见worker的构造器。
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论