隊列(Queue)與棧(Stack)是數據結構中的二種常用結構,隊列的特點是先進先出(First In First Out),而Stack是先進后出(First In Last Out),說得通俗點:Queue就是電影院入場時人們排起來的進場隊伍,先來的人(即:前排在前面的人)先入場,而Statck則是一隊人依次進入了一個死胡同想出來,先進去(最里面)的人,必須等后面的人(后進入的人)出來了,自己才能出來。
隊列在多線程應用中,常用于生產-消費場景,打個通俗的比方:很多人早上喜歡去買油條,買油條的人相當于消費者,做油條的師傅則是生產者。而油鍋邊上用于放油條的鐵架子,可以看成一個共享的隊列,師傅做好油條后,一根一根的撈出來放在架子上,而顧客則按排隊的順序一根根的付好錢從架子上拿。 即:隊列的一頭,不斷有人在放入東西(生產元素),另一頭不斷有人的消費(拿走元素)。這里就有一個很有趣的現象,如果買的人多,師傅來不及做,那么第一個顧客就會一直等著(后面的所有人也得等著,或稱為阻塞了后面的人),直到師傅炸好一根,然后第一個顧客買完走了,后面的人才能頂上來,類似的道理,如果架子放滿了,沒有人來買,師傅就會停下來,等有人來買了,才會繼續做,這就是所謂的隊列阻塞,而能產生阻塞行為的隊列稱為阻塞隊列。
從剛才的描述可以看出,發生阻塞起碼得滿足下面至少一個條件: (前提:隊列是有界的)
1.從隊列里取元素時,如果隊列為空,則代碼一直等在這里(即阻塞),直到隊列里有東西了,拿到元素了,后面的代碼才能繼續
2.向隊列里放元素時,如果隊列滿了(即放不下更多元素),則代碼也會卡住,直到隊列里的東西被取走了(即:有空位可以放新元素了),后面的代碼才能繼續
JDK7提供了以下7個阻塞隊列:
ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。
DelayQueue:使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:不存儲元素的阻塞隊列。
LinkedTransferQueue:鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列。
阻塞隊列提供了下列四種處理方法:
方法\處理方式 | 拋出異常 | 返回true/false | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() |
這4類方法中,在隊列已滿(或為空)的情況下,有些會拋出異常,有些則返回true/false,有些則一直阻塞,還有些則可以設置超時時間,時間到了后,自動退出阻塞狀態,實際項目中可根據需要選取適合的方法。
下面是一個基本示例:
模擬了買油條的場景,1個老板在做油條,3個顧客在排隊買
package yjmyzz.test; import java.util.concurrent.ArrayBlockingQueue; public class BlockingQueueTest { private static final int queueSize = 3; private static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(queueSize); private static final int produceSpeed = 2000;//生產速度(越小越快) private static final int consumeSpeed = 10;//消費速度(越小越快) public static void main(String[] args) { Thread producer = new Producer(); Thread consumer = new Consumer(); producer.start(); consumer.start(); } static class Producer extends Thread { public void run() { while (true) { try { System.out.println("老板準備炸油條了,架子上還能放:" + (queueSize - queue.size()) + "根油條"); queue.put("1根油條"); System.out.println("老板炸好了1根油條,架子上還能放:" + (queueSize - queue.size()) + "根油條"); Thread.sleep(produceSpeed); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread { public void run() { while (true) { try { System.out.println("A 準備買油條了,架子上還剩" + queue.size() + "根油條"); queue.take(); System.out.println("A 買到1根油條,架子上還剩" + queue.size() + "根油條"); Thread.sleep(consumeSpeed); System.out.println("B 準備買油條了,架子上還剩" + queue.size() + "根油條"); queue.take(); System.out.println("B 買到1根油條,架子上還剩" + queue.size() + "根油條"); Thread.sleep(consumeSpeed); System.out.println("C 準備買油條了,架子上還剩" + queue.size() + "根油條"); queue.take(); System.out.println("C 買到1根油條,架子上還剩" + queue.size() + "根油條"); Thread.sleep(consumeSpeed); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
輸出:(只取了前幾行)
1 老板準備炸油條了,架子上還能放:3根油條 2 老板炸好了1根油條,架子上還能放:2根油條 3 A 準備買油條了,架子上還剩1根油條 4 A 買到1根油條,架子上還剩0根油條 5 B 準備買油條了,架子上還剩0根油條 6 老板準備炸油條了,架子上還能放:3根油條 7 老板炸好了1根油條,架子上還能放:2根油條 8 B 買到1根油條,架子上還剩0根油條 9 C 準備買油條了,架子上還剩0根油條 10 ...
觀察5-8行,因為消費的速度遠大于生產速度,即:生意太好,老板來不及做。B要買油條時,發現賣完了,只能等老板再做一根出來,后面的C才有機會繼續購買。
這是一個很有意思的程序,可以把生產速度與消費速度值對換,輸出結果如下:
1 老板準備炸油條了,架子上還能放:3根油條 2 老板炸好了1根油條,架子上還能放:2根油條 3 A 準備買油條了,架子上還剩1根油條 4 A 買到1根油條,架子上還剩0根油條 5 老板準備炸油條了,架子上還能放:3根油條 6 老板炸好了1根油條,架子上還能放:2根油條 7 老板準備炸油條了,架子上還能放:2根油條 8 老板炸好了1根油條,架子上還能放:1根油條 9 老板準備炸油條了,架子上還能放:1根油條 10 老板炸好了1根油條,架子上還能放:0根油條 11 老板準備炸油條了,架子上還能放:0根油條 12 B 準備買油條了,架子上還剩3根油條 13 B 買到1根油條,架子上還剩2根油條 14 老板炸好了1根油條,架子上還能放:0根油條 15 老板準備炸油條了,架子上還能放:0根油條 16 C 準備買油條了,架子上還剩3根油條 17 C 買到1根油條,架子上還剩2根油條 18 老板炸好了1根油條,架子上還能放:0根油條 19 老板準備炸油條了,架子上還能放:0根油條 20 A 準備買油條了,架子上還剩3根油條 21 A 買到1根油條,架子上還剩2根油條 22 老板炸好了1根油條,架子上還能放:0根油條 23 老板準備炸油條了,架子上還能放:0根油條
因為生產速度遠大于消費速度,即:生意不好,油條做得比賣得快。觀察一下5-11行,老板卯足了勁做,但是這時一直沒人來買。然后12-13行,終于來了一個客戶B買了一根,然后老板又要開始做(14-15行),發現架子上放滿了,不得不停下,等C再買一根(16-17行),才能繼續做(18行)
實現原理:
聊聊并發(七)——Java中的阻塞隊列 一文中已經對ArrayBlockingQueue的源碼進行比較詳細的分析了,這里只貼幾段主要的代碼,體會一下思想:
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
這3個變量很重要,ReentrantLock重入鎖,notEmpty檢查不為空的Condition 以及 notFull用來檢查隊列未滿的Condition
Condition是一個接口,里面有二個重要的方法:
await() : Causes the current thread to wait until it is signalled or interrupted. 即阻塞當前線程,直到被通知(喚醒)或中斷
singal(): Wakes up one waiting thread. 喚醒阻塞的線程
再來看put方法:(jdk 1.8)
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
1.先獲取鎖
2.然后用while循環檢測元素個數是否等于items長度,如果相等,表示隊列滿了,調用notFull的await()方法阻塞線程
3.否則調用enqueue()方法添加元素
4.最后解鎖
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
這是添加元素的代碼(jdk 1.8),注意最后一行notEmpty.signal()方法,表示添加完元素后,調用singal()通知等待(從隊列中取元素)的線程,隊列不空(有值)啦,可以來取東西了。
類似的take()與dequeue()方法則相當于逆過程(注:同樣都是jdk 1.8)
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
類似的:
1. 先加鎖
2. 如果元素個數為空,表示隊列已空,調用notEmpty的await()阻塞線程,直接隊列里又有新元素加入為止
3. 然后調用dequeue 從隊列里刪除元素
4. 解鎖
dequeue方法:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
倒數第2行,元素移除后,調用notFull.singnal喚醒等待(向隊列添加元素的)線程,隊列有空位了,可以向里面添加元素了。
參考文章:
文章列表