文章出處

隊列(Queue)與棧(Stack)是數據結構中的二種常用結構,隊列的特點是先進先出(First In First Out),而Stack是先進后出(First In Last Out),說得通俗點:Queue就是電影院入場時人們排起來的進場隊伍,先來的人(即:前排在前面的人)先入場,而Statck則是一隊人依次進入了一個死胡同想出來,先進去(最里面)的人,必須等后面的人(后進入的人)出來了,自己才能出來。

隊列在多線程應用中,常用于生產-消費場景,打個通俗的比方:很多人早上喜歡去買油條,買油條的人相當于消費者,做油條的師傅則是生產者。而油鍋邊上用于放油條的鐵架子,可以看成一個共享的隊列,師傅做好油條后,一根一根的撈出來放在架子上,而顧客則按排隊的順序一根根的付好錢從架子上拿。 即:隊列的一頭,不斷有人在放入東西(生產元素),另一頭不斷有人的消費(拿走元素)。這里就有一個很有趣的現象,如果買的人多,師傅來不及做,那么第一個顧客就會一直等著(后面的所有人也得等著,或稱為阻塞了后面的人),直到師傅炸好一根,然后第一個顧客買完走了,后面的人才能頂上來,類似的道理,如果架子放滿了,沒有人來買,師傅就會停下來,等有人來買了,才會繼續做,這就是所謂的隊列阻塞,而能產生阻塞行為的隊列稱為阻塞隊列。

從剛才的描述可以看出,發生阻塞起碼得滿足下面至少一個條件: (前提:隊列是有界的)

1.從隊列里取元素時,如果隊列為空,則代碼一直等在這里(即阻塞),直到隊列里有東西了,拿到元素了,后面的代碼才能繼續

2.向隊列里放元素時,如果隊列滿了(即放不下更多元素),則代碼也會卡住,直到隊列里的東西被取走了(即:有空位可以放新元素了),后面的代碼才能繼續

JDK7提供了以下7個阻塞隊列:

ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。
DelayQueue:使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:不存儲元素的阻塞隊列。
LinkedTransferQueue:鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列。

阻塞隊列提供了下列四種處理方法:

方法\處理方式拋出異常返回true/false一直阻塞超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek()    

這4類方法中,在隊列已滿(或為空)的情況下,有些會拋出異常,有些則返回true/false,有些則一直阻塞,還有些則可以設置超時時間,時間到了后,自動退出阻塞狀態,實際項目中可根據需要選取適合的方法。

 

下面是一個基本示例:

模擬了買油條的場景,1個老板在做油條,3個顧客在排隊買

package yjmyzz.test;

import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueTest {

    private static final int queueSize = 3;
    private static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(queueSize);
    private static final int produceSpeed = 2000;//生產速度(越小越快)
    private static final int consumeSpeed = 10;//消費速度(越小越快)

    public static void main(String[] args) {
        Thread producer = new Producer();
        Thread consumer = new Consumer();
        producer.start();
        consumer.start();
    }

    static class Producer extends Thread {
        public void run() {
            while (true) {
                try {
                    System.out.println("老板準備炸油條了,架子上還能放:" + (queueSize - queue.size()) + "根油條");
                    queue.put("1根油條");
                    System.out.println("老板炸好了1根油條,架子上還能放:" + (queueSize - queue.size()) + "根油條");
                    Thread.sleep(produceSpeed);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread {
        public void run() {
            while (true) {
                try {
                    System.out.println("A 準備買油條了,架子上還剩" + queue.size() + "根油條");
                    queue.take();
                    System.out.println("A 買到1根油條,架子上還剩" + queue.size() + "根油條");
                    Thread.sleep(consumeSpeed);

                    System.out.println("B 準備買油條了,架子上還剩" + queue.size() + "根油條");
                    queue.take();
                    System.out.println("B 買到1根油條,架子上還剩" + queue.size() + "根油條");
                    Thread.sleep(consumeSpeed);

                    System.out.println("C 準備買油條了,架子上還剩" + queue.size() + "根油條");
                    queue.take();
                    System.out.println("C 買到1根油條,架子上還剩" + queue.size() + "根油條");
                    Thread.sleep(consumeSpeed);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

輸出:(只取了前幾行)

 1 老板準備炸油條了,架子上還能放:3根油條
 2 老板炸好了1根油條,架子上還能放:2根油條
 3 A 準備買油條了,架子上還剩1根油條
 4 A 買到1根油條,架子上還剩0根油條
 5 B 準備買油條了,架子上還剩0根油條
 6 老板準備炸油條了,架子上還能放:3根油條
 7 老板炸好了1根油條,架子上還能放:2根油條
 8 B 買到1根油條,架子上還剩0根油條
 9 C 準備買油條了,架子上還剩0根油條
10 ...

觀察5-8行,因為消費的速度遠大于生產速度,即:生意太好,老板來不及做。B要買油條時,發現賣完了,只能等老板再做一根出來,后面的C才有機會繼續購買。

這是一個很有意思的程序,可以把生產速度與消費速度值對換,輸出結果如下:

 1 老板準備炸油條了,架子上還能放:3根油條
 2 老板炸好了1根油條,架子上還能放:2根油條
 3 A 準備買油條了,架子上還剩1根油條
 4 A 買到1根油條,架子上還剩0根油條
 5 老板準備炸油條了,架子上還能放:3根油條
 6 老板炸好了1根油條,架子上還能放:2根油條
 7 老板準備炸油條了,架子上還能放:2根油條
 8 老板炸好了1根油條,架子上還能放:1根油條
 9 老板準備炸油條了,架子上還能放:1根油條
10 老板炸好了1根油條,架子上還能放:0根油條
11 老板準備炸油條了,架子上還能放:0根油條
12 B 準備買油條了,架子上還剩3根油條
13 B 買到1根油條,架子上還剩2根油條
14 老板炸好了1根油條,架子上還能放:0根油條
15 老板準備炸油條了,架子上還能放:0根油條
16 C 準備買油條了,架子上還剩3根油條
17 C 買到1根油條,架子上還剩2根油條
18 老板炸好了1根油條,架子上還能放:0根油條
19 老板準備炸油條了,架子上還能放:0根油條
20 A 準備買油條了,架子上還剩3根油條
21 A 買到1根油條,架子上還剩2根油條
22 老板炸好了1根油條,架子上還能放:0根油條
23 老板準備炸油條了,架子上還能放:0根油條

因為生產速度遠大于消費速度,即:生意不好,油條做得比賣得快。觀察一下5-11行,老板卯足了勁做,但是這時一直沒人來買。然后12-13行,終于來了一個客戶B買了一根,然后老板又要開始做(14-15行),發現架子上放滿了,不得不停下,等C再買一根(16-17行),才能繼續做(18行)

 

實現原理:

聊聊并發(七)——Java中的阻塞隊列 一文中已經對ArrayBlockingQueue的源碼進行比較詳細的分析了,這里只貼幾段主要的代碼,體會一下思想:

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

這3個變量很重要,ReentrantLock重入鎖,notEmpty檢查不為空的Condition 以及  notFull用來檢查隊列未滿的Condition

Condition是一個接口,里面有二個重要的方法:
await() : Causes the current thread to wait until it is signalled or interrupted. 即阻塞當前線程,直到被通知(喚醒)或中斷

singal(): Wakes up one waiting thread. 喚醒阻塞的線程

再來看put方法:(jdk 1.8)

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

1.先獲取鎖

2.然后用while循環檢測元素個數是否等于items長度,如果相等,表示隊列滿了,調用notFull的await()方法阻塞線程

3.否則調用enqueue()方法添加元素

4.最后解鎖

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

這是添加元素的代碼(jdk 1.8),注意最后一行notEmpty.signal()方法,表示添加完元素后,調用singal()通知等待(從隊列中取元素)的線程,隊列不空(有值)啦,可以來取東西了。

類似的take()與dequeue()方法則相當于逆過程(注:同樣都是jdk 1.8)

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

類似的:

1. 先加鎖

2. 如果元素個數為空,表示隊列已空,調用notEmpty的await()阻塞線程,直接隊列里又有新元素加入為止

3. 然后調用dequeue 從隊列里刪除元素

4. 解鎖

dequeue方法:

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

倒數第2行,元素移除后,調用notFull.singnal喚醒等待(向隊列添加元素的)線程,隊列有空位了,可以向里面添加元素了。

參考文章:

http://ifeve.com/java-blocking-queue/

http://www.cnblogs.com/dolphin0520/p/3932906.html


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()