阻塞队列之LinkedBlockingQueue
概述
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独⽴的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并⾏执⾏。LinkedBlockingQueue采⽤可重⼊锁(ReentrantLock)来保证在并发情况下的线程安全。
构造器
LinkedBlockingQueue⼀共有三个构造器,分别是⽆参构造器、可以指定容量的构造器、可以穿⼊⼀个容器的构造器。如果在创建实例的时候调⽤的是⽆参构造器,LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,这样做很可能会导致队列还没有满,但是内存却已经满了的情况(内存溢出)。
1public LinkedBlockingQueue();//设置容量为Integer.MAX
2
3public LinkedBlockingQueue(int capacity);//设置指定容量
4
5public LinkedBlockingQueue(Collection<? extends E> c);//穿⼊⼀个容器,如果调⽤该构造器,容量默认也是Integer.MAX_VALUE LinkedBlockingQueue常⽤操作
取数据
take():⾸选。当队列为空时阻塞
poll():弹出队顶元素,队列为空时,返回空
peek():和poll烈性,返回队队顶元素,但顶元素不弹出。队列为空时返回null
remove(Object o):移除某个元素,队列为空时抛出异常。成功移除返回true
添加数据
put():⾸选。队满是阻塞
offer():队满时返回false
判断队列是否为空
size()⽅法会遍历整个队列,时间复杂度为O(n),所以最好选⽤isEmtpy
put元素原理
基本过程:
1.判断元素是否为null,为null抛出异常
2.加锁(可中断锁)
3.判断队列长度是否到达容量,如果到达⼀直等待
4.如果没有队满,enqueue()在队尾加⼊元素
5.队列长度加1,此时如果队列还没有满,调⽤signal唤醒其他堵塞队列
1if (e == null) throw new NullPointerException();
2
3int c = -1;
4        Node<E> node = new Node<E>(e);
5final ReentrantLock putLock = this.putLock;
6final AtomicInteger count = unt;
7        putLock.lockInterruptibly();
8try {
9while (() == capacity) {
10                notFull.await();
11            }
12            enqueue(node);
13            c = AndIncrement();
14if (c + 1 < capacity)
15                notFull.signal();
16        } finally {
17            putLock.unlock();
18        }
take元素原理
基本过程:
1.加锁(依旧是ReentrantLock),注意这⾥的锁和写⼊是不同的两把锁
2.判断队列是否为空,如果为空就⼀直等待
3.通过dequeue⽅法取得数据
3.取⾛元素后队列是否为空,如果不为空唤醒其他等待中的队列
1public E take() throws InterruptedException {
2        E x;
3int c = -1;
4final AtomicInteger count = unt;
5final ReentrantLock takeLock = this.takeLock;
6        takeLock.lockInterruptibly();
7try {
8while (() == 0) {
9                notEmpty.await();
10            }
11            x = dequeue();
12            c = AndDecrement();
13if (c > 1)
14                notEmpty.signal();
15        } finally {
16            takeLock.unlock();
17        }
18if (c == capacity)
19            signalNotFull();
20return x;
21    }
enqueue()和dequeue()⽅法实现都⽐较简单,⽆⾮就是将元素添加到队尾,从队顶取⾛元素,感兴趣的朋友可以⾃⼰去看⼀下,这⾥就不粘贴了。
LinkedBlockingQueue与LinkedBlockingDeque⽐较
LinkedBlockingDeque和LinkedBlockingQueue的相同点在于:
1. 基于链表
2. 容量可选,不设置的话,就是Int的最⼤值
和LinkedBlockingQueue的不同点在于:
1. 双端链表和单链表
2. 不存在哨兵节点
3. ⼀把锁+两个条件
实例:
⼩记:AtomicInteger的getAndIncrment和getAndDcrement()等⽅法,这些⽅法分为两步,get和increment(decrement),在get和increment 中间可能有其他线程进⼊,导致多个线程get到的数值是相同的,也会导致多个线程累加后的值其实累加1.在这种情况下,使⽤volatile也是没有效果的,因为get之后没有对值进⾏修改,不能触发的效果。
1public class ProducerAndConsumer {
2public static void main(String[] args){
3
4try{
5            BlockingQueue queue = new LinkedBlockingQueue(5);
6
7            ExecutorService executor = wFixedThreadPool(5);
8            Produer producer = new Produer(queue);
9for(int i=0;i<3;i++){
10                ute(producer);
11            }
12            ute(new Consumer(queue));
13
14            executor.shutdown();
15        }catch (Exception e){
16            e.printStackTrace();
17        }
18
19    }
20 }
21
22class Produer implements  Runnable{
23
24private BlockingQueue queue;
25private int nums = 20;  //循环次数
26
27//标记数据编号
28private static volatile AtomicInteger count = new AtomicInteger();
29private boolean isRunning = true;
30public Produer(){}
31
32public Produer(BlockingQueue queue){
33this.queue = queue;
34    }
35
36public void run() {
37        String data = null;
38try{
39            System.out.println("开始⽣产数据");
40            System.out.println("-----------------------");
41
42while(nums>0){
43                nums--;
44                count.decrementAndGet();
block truncated45
46                Thread.sleep(500);
47                System.out.println(Thread.currentThread().getId()+ " :⽣产者⽣产了⼀个数据");
48                queue.AndIncrement());
49            }
50        }catch(Exception e){
51            e.printStackTrace();
52            Thread.currentThread().interrupt();
53        }finally{
54            System.out.println("⽣产者线程退出!");
55        }
56    }
57 }
58
59class Consumer implements Runnable{
60
61private BlockingQueue queue;
62private int nums = 20;
63private boolean isRunning = true;
64
65public Consumer(){}
66
67public Consumer(BlockingQueue queue){
68this.queue = queue;
69    }
70
71public void run() {
72
73        System.out.println("消费者开始消费");
74        System.out.println("-------------------------");
75
76while(nums>0){
77            nums--;
78try{
79while(isRunning){
80int data = (Integer)queue.take();
81                    Thread.sleep(500);
82                    System.out.println("消费者消费的数据是" + data);
83            }
84
85            }catch(Exception e){
86                e.printStackTrace();
87                Thread.currentThread().interrupt();
88            }finally {
89                System.out.println("消费者线程退出!");
90            }
91
92        }
93    }
94 }
效果:
1 1
2 :⽣产者⽣产了⼀个数据
2 11 :⽣产者⽣产了⼀个数据
3 13 :⽣产者⽣产了⼀个数据
4 12 :⽣产者⽣产了⼀个数据
5消费者消费的数据是-3
6 11 :⽣产者⽣产了⼀个数据
7 13 :⽣产者⽣产了⼀个数据
8 12 :⽣产者⽣产了⼀个数据
9消费者消费的数据是-3
10 13 :⽣产者⽣产了⼀个数据
11 11 :⽣产者⽣产了⼀个数据
12 12 :⽣产者⽣产了⼀个数据
13消费者消费的数据是-3
14 13 :⽣产者⽣产了⼀个数据
15 11 :⽣产者⽣产了⼀个数据
16消费者消费的数据是-3
17消费者消费的数据是-3
可以看到,有多个producer在⽣产数据的时候get到的是相同的值。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。