java创建线程池的4种⽅式之队例DelayedWorkQueue(⼋)
我们带着以下问题看如何实现的。
1. DelayedWorkQueue的数据结构是怎样的
2. DelayedWorkQueue如何进⾏⼊队出队
3. DelayedWorkQueue如何实现延迟取出队
问题分析
我们先看下DelayedWorkQueue类定义DelayedWorkQueue基于堆的数据结构,如DelayQueue和PriorityQueue中的数据结构,除了每个ScheduledFutureTask还将其索引记录到堆数组中。这消除了在取消时到任务的需要,⼤⼤加快了移除(从O(n)到O(log n)),并减少了垃圾保留,否则通过等待元素在清除之前升⾄顶部⽽发⽣垃圾保留。但是因为队列也可能包含不是ScheduledFutureTasks的RunnableScheduledFutures,所以我们不能保证有这样的索引可⽤,在这种情况下我们会回到线性搜索。(我们希望⼤多数任务都不会被装饰,⽽且更快的情况会更常见。)所有堆操作都必须记录索引更改 - 主要在siftUp和siftDown中。删除后,任务的heapIndex设置为-1。请注意,ScheduledFutureTasks最多可以出现在队列中⼀次(对于其他类型的任务或⼯作队列,这不⼀定是这样),因此由heapIndex
唯⼀标识。
根据翻译,我们知道DelayedWorkQueue基于堆的数据结构,对于堆的介绍虽然不是本⽂重点,但是由于后续的代码需要了解堆的存取,所以我们详细介绍下堆、以及如何进⾏数据的存取的
我们先来看下什么是堆
它是⼀个完全⼆叉树,即除了最后⼀层节点不是满的,其他层节点都是满的,即左右节点都有
它不是⼆叉搜索树,即左节点的值都⽐⽗节点值⼩,右节点的值都不⽐⽗节点值⼩,这样查的时候,就可以通过⼆分的⽅式,效率是(log N)。
它是特殊的⼆叉树,它要求⽗节点的值不能⼩于⼦节点的值。这样保证⼤的值在上⾯,⼩的值在下⾯。所以堆遍历和查都是低效的,因为我们只知道从根节点到⼦叶节点的每条路径都是降序的,但是各个路径之间都是没有联系的,查⼀个值时,你不知道应该从左节点查还是从右节点开始查
它可以实现快速的插⼊和删除,效率都在(log N)左右。所以它可以实现优先级队列
堆是⼀个⼆叉树,但是它最简单的⽅式是通过数组去实现⼆叉树,⽽且因为堆是⼀个完全⼆叉树,就不存在数组空间的浪费。怎么使⽤数组来存储⼆叉树呢?
就是⽤数组的下标来模拟⼆叉树的各个节点 ,⽐如说根节点就是0,第⼀层的左节点是1,右节点是2。由此我们可以得出下列公式:
// 对于n位置的节点来说:
int left = 2 * n + 1; // 左⼦节点
int right = 2 * n + 2; // 右⼦节点
int parent = (n - 1) / 2; // ⽗节点,当然n要⼤于0,根节点是没有⽗节点的
对于堆来说,只有两个操作,插⼊insert和删除remove,不管插⼊还是删除保证堆的成⽴条件,1.是完全⼆叉树,2.⽗节点的值不能⼩于⼦节点的值。
public void insert(int value) {
// 第⼀步将插⼊的值,直接放在最后⼀个位置。并将长度加⼀
store[size++] = value;
// 得到新插⼊值所在位置。
int index = size - 1;
while(index > 0) {
// 它的⽗节点位置坐标
int parentIndex = (index - 1) / 2;
// 如果⽗节点的值⼩于⼦节点的值,你不满⾜堆的条件,那么就交换值
if (store[index] > store[parentIndex]) {
swap(store, index, parentIndex);
index = parentIndex;
} else {
// 否则表⽰这条路径上的值已经满⾜降序,跳出循环
break;
}
}
}
主要步骤:
直接将value插⼊到size位置,并将size⾃增,这样store数组中插⼊⼀个值了。
要保证从这个叶节点到根节点这条路径上的节点,满⾜⽗节点的值不能⼩于⼦节点。
通过int parentIndex = (index - 1) / 2得到⽗节点,如果⽐⽗节点值⼤,那么两者位置的值交换,然后再拿这个⽗节点和它的⽗⽗节点⽐较。
直到这个节点值⽐⽗节点值⼩,或者这个节点已经是根节点就退出循环。
因为我们每次只插⼊⼀个值,所以只需要保证新插⼊位置的叶节点到根节点路径满⾜堆的条件,因为其他路径没做操作,肯定是满⾜条件的。第⼆因为是直接在size位置插⼊值,所以肯定满⾜是完全⼆叉树这个条件。因为每次循环index都是除以2这种倍数递减的⽅式,所以它最多循环次数是(log N)次。
public int remove() {
// 将根的值记录,最后返回
int result = store[0];
// 将最后位置的值放到根节点位置
store[0] = store[--size];
int index = 0;
// 通过循环,保证⽗节点的值不能⼩于⼦节点。
while(true) {
int leftIndex = 2 * index + 1; // 左⼦节点
int rightIndex = 2 * index + 2; // 右⼦节点
// leftIndex >= size 表⽰这个⼦节点还没有值。
if (leftIndex >= size) break;
int maxIndex = leftIndex;
if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
if (store[index] < store[maxIndex]) {
swap(store, index, maxIndex);
index = maxIndex;
} else {
break;
}
}
return result;
}
在堆中最⼤值就在根节点,所以操作步骤:
将根节点的值保存到result中。
将最后节点的值移动到根节点,再将长度减⼀,这样满⾜堆成⽴第⼀个条件,堆是⼀个完全⼆叉树。
使⽤循环,来满⾜堆成⽴的第⼆个条件,⽗节点的值不能⼩于⼦节点的值。
最后返回result。
那么怎么样满⾜堆的第⼆个条件呢?
因为根点的值现在是新值,那么就有可能⽐它的⼦节点⼩,所以就有可能要进⾏交换。
我们要出左⼦节点和右⼦节点那个值更⼤,因为这个值可能要和⽗节点值进⾏交换,如果它不是较⼤值的话,它和⽗节点进⾏交换之后,就会出现⽗节点的值⼩于⼦节点。
将到的较⼤⼦节点值和⽗节点值进⾏⽐较。
如果⽗节点的值⼩于它,那么将⽗节点和较⼤⼦节点值进⾏交换,然后再⽐较较⼤⼦节点和它的⼦节点。
如果⽗节点的值不⼩于⼦节点较⼤值,或者没有⼦节点(即这个节点已经是叶节点了),就跳出循环。
每次循环我们都是以2的倍数递增,所以它也是最多循环次数是(log N)次。
所以通过堆这种⽅式可以快速实现优先级队列,它的插⼊和删除操作的效率都是O(log N)。
以上是对堆结构的简单介绍,为什么要介绍堆呢,因为我们接下来要说的DelayedWorkQueue就是基于最⼩堆(即⽗节点⼩于等于⼦节点,根节点元素,是所以元素中最⼩的⼀个)来构造的,先看下DelayedWorkQueue的基本结构:
// 初始容量,当堆⼤⼩⼀溢出时,扩容按照1.5倍进⾏扩容
private static final int INITIAL_CAPACITY = 16;
// 堆存储数组,我们可以看到这⾥是通过数据实现对堆的存储,如果⽗节点下标为n,则左⼦节点下标为2*n+1, ⼜⼦节点是2*n+2
// 例如下标0的⼦节点分别为1和2
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 全局锁,⽤来对操作加锁
private final ReentrantLock lock = new ReentrantLock();
// 初始化堆⼤⼩,有别于堆容量,size⼩于等于queue.length
private int size = 0;
// 获取堆头结点的线程
private Thread leader = null;
// 维护⼀个等待条件,当新的节点变成头节点时,或⼀个新线程成为leader时,进⾏singal
private final Condition available = wCondition();
DelayedWorkQueue是⽤数组来储存队列中的元素,那么我们看看它是怎么实现优先级队列的。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 当k==0时,就到了堆⼆叉树的根节点了,跳出循环
while (k > 0) {
// ⽗节点位置坐标, 相当于(k - 1) / 2
int parent = (k - 1) >>> 1;
// 获取⽗节点位置元素
RunnableScheduledFuture<?> e = queue[parent];
// 如果key元素⼤于⽗节点位置元素,满⾜条件,那么跳出循环
// 因为是从⼩到⼤排序的。
if (keypareTo(e) >= 0)
break;
// 否则就将⽗节点元素存放到k位置
queue[k] = e;
// 这个只有当元素是ScheduledFutureTask对象实例才有⽤,⽤来快速取消任务。
setIndex(e, k);
// 重新赋值k,寻元素key应该插⼊到堆⼆叉树的那个节点
k = parent;
}
// 循环结束,k就是元素key应该插⼊的节点位置
queue[k] = key;
setIndex(key, k);
}
通过循环,来查元素key应该插⼊在堆⼆叉树那个节点位置,并交互⽗节点的位置。具体流程在前⾯已经介绍过了。移除元素排序siftDown⽅法
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
// 通过循环,保证⽗节点的值不能⼩于⼦节点。
while (k < half) {
// 左⼦节点, 相当于 (k * 2) + 1
int child = (k << 1) + 1;
// 左⼦节点位置元素
RunnableScheduledFuture<?> c = queue[child];
// 右⼦节点, 相当于 (k * 2) + 2
int right = child + 1;
// 如果左⼦节点元素值⼤于右⼦节点元素值,那么右⼦节点才是较⼩值的⼦节点。
/
/ 就要将c与child值重新赋值
if (right < size && cpareTo(queue[right]) > 0)
c = queue[chil
d = right];
// 如果⽗节点元素值⼩于较⼩的⼦节点元素值,那么就跳出循环
if (keypareTo(c) <= 0)
break;
java线程池创建的四种// 否则,⽗节点元素就要和⼦节点进⾏交换
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
通过循环,保证⽗节点的值不能⼩于⼦节点。
插⼊元素⽅法
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
我们发现与普通阻塞队列相⽐,这三个添加⽅法都是调⽤offer⽅法。那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进⾏数组扩容。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 使⽤lock保证并发操作安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果要超过数组长度,就要进⾏数组扩容
if (i >= queue.length)
// 数组扩容
grow();
// 将队列中元素个数加⼀
size = i + 1;
// 如果是第⼀个元素,那么就不需要排序,直接赋值就⾏了
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 调⽤siftUp⽅法,使插⼊的元素变得有序。
siftUp(i, e);
}
// 表⽰新插⼊的元素是队列头,更换了队列头,
// 那么就要唤醒正在等待获取任务的线程。
if (queue[0] == e) {
leader = null;
/
/ 唤醒正在等待等待获取任务的线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
主要是三步:
元素个数超过数组长度,就会调⽤grow()⽅法,进⾏数组扩容。
将新元素e添加到优先级队列中对应的位置,通过siftUp⽅法,保证按照元素的优先级排序。
如果新插⼊的元素是队列头,即更换了队列头,那么就要唤醒正在等待获取任务的线程。这些线程可能是因为原队列头元素的延时时间没到,⽽等待的
数组扩容⽅法
private void grow() {
int oldCapacity = queue.length;
// 每次扩容增加原来数组的⼀半数量。
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
// 使⽤pyOf来复制⼀个新数组
queue = pyOf(queue, newCapacity);
}
⽴即获取队列头元素

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。