4种线程池和7种并发队列
⼀、7种并发队列
Java并发包中的阻塞队列⼀共7个,当然他们都是线程安全的。
java线程池创建的四种阻塞队列。
有界阻塞队列。
ArrayBlockingQueue:⼀个由数组结构组成的
ArrayBlockingQueue:⼀个由数组结构组成的有界
LinkedBlockingQueue:⼀个由链表结构组成的⽆界阻塞队列。
PriorityBlockingQueue:⼀个⽀持优先级排序的⽆界阻塞队列。
DealyQueue:⼀个使⽤优先级(启动时间)队列实现的⽆界阻塞队列。
SynchronousQueue:⼀个不存储元素的阻塞队列。
LinkedTransferQueue:⼀个由链表结构组成的⽆界阻塞队列。
LinkedBlockingDeque:⼀个由链表结构组成的双向阻塞队列。
常⽤的只有三个,重点是前两个
LinkedBlockingQueue和ArrayBlockingQueue区别:
1、LinkedBlockingQueue内部由两个ReentrantLock来实现出⼊队列的线程安全,由各⾃的Condition对象的await和signal来实现等待和唤醒功能。⽽ArrayBlockingQueue的只使⽤⼀个ReentrantLock管理进出队列。
⽽LinkedBlockingQueue实现的队列中的锁是分离的,其添加采⽤的是putLock,移除采⽤的则是takeLock,这样能⼤⼤提⾼队列的吞吐量,也意味着在⾼并发的情况下⽣产者和消费者可以并⾏地操作队列中的数据,以此来提⾼整个队列的并发性能。
2、队列⼤⼩有所不同,ArrayBlockingQueue是有界的初始化必须指定⼤⼩,⽽LinkedBlockingQueue可以是有界的也可以是⽆界的(Integer.MAX_VALUE),对于后者⽽⾔,当添加速度⼤于移除速度时,在⽆界的情况下,可能会造成内存溢出等问题。
3、数据存储容器不同,ArrayBlockingQueue采⽤的是数组作为数据存储容器,⽽LinkedBlockingQueue采⽤的则是以Node节点作为连接对象的链表。
由于ArrayBlockingQueue采⽤的是数组的存储容器,因此在插⼊或删除元素时不会产⽣或销毁任何额外的对象实例,⽽LinkedBlockingQueue 则会⽣成⼀个额外的Node对象。这可能在长时间内需要⾼效并发地处理⼤批量数据的时,对于GC可能存在较⼤影响。
SynchronousQueue
没有容量,是⽆缓冲等待队列,是⼀个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素
使⽤SynchronousQueue阻塞队列⼀般要求maximumPoolSizes为⽆界,避免线程拒绝执⾏操作。
⼆、4种线程池
1、newfixed,线程默认⼤⼩确定、最⼤⼤⼩确定(实际没什么⽤),默认使⽤linkedblockqueue,⽆尽队列
危害在于这个等待队列,队列如果消费不及时不断膨胀可以使机器资源耗尽
ArrayBlockingQueue是⼀个有界缓存等待队列,可以指定缓存队列的⼤⼩,当正在执⾏的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执⾏,当ArrayBlockingQueue已满时,加⼊ArrayBlockingQueue失败,会开启新的线程去执⾏,当线程数已经达到最⼤的maximumPoolSizes时,再有新的元素尝试加⼊ArrayBlockingQueue时会报错。
2、cached,线程数不限⼤⼩
危害 本⾝就是没有限制,有多少请求创建多少线程,直到资源耗尽
CachedThreadPool使⽤没有容量的SynchronousQueue作为主线程池的⼯作队列,它是⼀个没有容量的阻塞队列。每个插⼊操作必须等待另⼀个线程的对应移除操作。这意味着,如果主线程提交任务的速度⾼于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程⽽耗尽CPU资源。
3、single
可顺序执⾏任务,同时只有⼀个线程处理(单线程)
执⾏过程如下:
1.如果当前⼯作中的线程数量少于corePool的数量,就创建⼀个新的线程来执⾏任务。
2.当线程池的⼯作中的线程数量达到了corePool,则将任务加⼊LinkedBlockingQueue。
3.线程执⾏完1中的任务后会从队列中去任务。
注意:由于在线程池中只有⼀个⼯作线程,所以任务可以按照添加顺序执⾏。
4、ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
使⽤了延迟队列,⽆界,和cached类似
如果运⾏的线程达到了corePoolSize,就把任务添加到任务队列DelayedWorkQueue中;DelayedWorkQueue会将任务排序,先执⾏的任务放在队列的前⾯。
任务执⾏完后,ScheduledFutureTask中的变量time改为下次要执⾏的时间,并放回到DelayedWorkQueue中
当⽣产者线程调⽤put之类的⽅法加⼊元素时,DelayQueue是⼀个没有边界BlockingQueue实现,加⼊其中的元素必需实现Delayed接⼝。当⽣产者线程调⽤put之类的⽅法加⼊元素时,会触发Delayed接⼝中的compareTo⽅法进⾏排序,也就是说队列中元素的顺序是按到期时间排序的,⽽⾮它们进⼊队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
消费者线程查看队列头部的元素,注意是查看不是取出。然后调⽤元素的getDelay⽅法,如果此⽅法返回的值⼩0或者等于0,则消费者线程会从队列中取出此元素,并进⾏处理。如果getDelay⽅法返回的值⼤于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。
DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,⽽不是长时间的等待。消费者线程尽量把时间花在处理任务上,最⼩化空等的时间,以提⾼线程的利⽤效率。
重要---线程池参数
⽆论创建那种线程池 必须要调⽤ThreadPoolExecutor,以上四种都是调⽤这个实现的
线程池类为 urrent.ThreadPoolExecutor,常⽤构造⽅法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最⼤数量
unit: 线程池维护线程所允许的空闲时间的单位
keepAliveTime: 线程池维护线程所允许的空闲时间 ,
keepAliveTime: 线程池维护线程所允许的空闲时间 ,unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使⽤的缓冲队列
handler: 线程池对拒绝任务的处理策略 --饱和策略
-------------------------------------------------------------------
通⽤流程:
⼀个任务通过 execute(Runnable)⽅法被添加到线程池,任务就是⼀个 Runnable类型的对象,任务的执⾏⽅法就是 Runnable类型对象的
run()⽅法。
当⼀个任务通过execute(Runnable)⽅法欲添加到线程池时:
如果此时线程池中的数量⼩于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放⼊缓冲队列。
如果此时线程池中的数量⼤于corePoolSize,缓冲队列workQueue满,并且线程池中的数量⼩于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量⼤于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核⼼线程corePoolSize、任务队列workQueue、最⼤线程maximumPoolSize,如果三者都满了,使⽤handler处理被拒绝的任务。
当线程池中的线程数量⼤于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终⽌。这样,线程池可以动态的调整池中的线程数。
---------------------------------------------------------------------------------------------
unit可选的参数为urrent.TimeUnit中的⼏个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常⽤的是:urrent.ArrayBlockingQueue
ArrayBlockingQueue
handler有四个选择:
1、【抛异常】ThreadPoolExecutor.AbortPolicy() 抛出urrent.RejectedExecutionException异常
2、【重试】ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会⾃动重复调⽤execute()⽅法ThreadPoolExecutor.DiscardOldestPolicy()
3、【个旧的停了】抛弃旧的任务 ThreadPoolExecutor.DiscardPolicy()
4、【不抛异常】抛弃当前的任务
饱和策略,如记录⽇志或持久化存储不能处理的任务。
5、当然也可以根据应⽤场景实现RejectedExecutionHandler接⼝,⾃定义饱和策略
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论