文章出處

最近把《java并發編程實戰》-Java Consurrency in Practice 重溫了一遍,把書中提到的一些常用工具記錄于此:

一、閉鎖(門栓)- CountDownLatch

適用場景:多線程測試時,通常為了精確計時,要求所有線程都ready后,才開始執行,防止有線程先起跑,造成不公平,類似的,所有線程執行完,整個程序才算運行完成。

    /**
     * 閉鎖測試(菩提樹下的楊過 http://yjmyzz.cnblogs.com/)
     *
     * @throws InterruptedException
     */
    @Test
    public void countdownLatch() throws InterruptedException {
        CountDownLatch startLatch = new CountDownLatch(1); //類似發令槍
        CountDownLatch endLatch = new CountDownLatch(10);//這里的數量,要與線程數相同

        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(() -> {
                try {
                    startLatch.await(); //先等著,直到發令槍響,防止有線程先run
                    System.out.println(Thread.currentThread().getName() + " is running...");
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    endLatch.countDown(); //每個線程執行完成后,計數
                }
            });
            t.setName("線程-" + i);
            t.start();
        }
        long start = System.currentTimeMillis();
        startLatch.countDown();//發令槍響,所有線程『開跑』
        endLatch.await();//等所有線程都完成
        long end = System.currentTimeMillis();
        System.out.println("done! exec time => " + (end - start) + " ms");
    }  

執行結果:

線程-1 is running...
線程-5 is running...
線程-8 is running...
線程-4 is running...
線程-3 is running...
線程-0 is running...
線程-2 is running...
線程-9 is running...
線程-7 is running...
線程-6 is running...
done! exec time => 13 ms

注:大家可以把第14行注釋掉,再看看運行結果有什么不同。

 

二、信號量(Semaphore)

適用場景:用于資源數有限制的并發訪問場景。

   public class BoundedHashSet<T> {
        private final Set<T> set;
        private final Semaphore semaphore;

        public BoundedHashSet(int bound) {
            this.set = Collections.synchronizedSet(new HashSet<T>());
            this.semaphore = new Semaphore(bound);
        }

        public boolean add(T t) throws InterruptedException {
            if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
                return false;
            }
            ;
            boolean added = false;
            try {
                added = set.add(t);
                return added;
            } finally {
                if (!added) {
                    semaphore.release();
                }
            }
        }

        public boolean remove(Object o) {
            boolean removed = set.remove(o);
            if (removed) {
                semaphore.release();
            }
            return removed;
        }
    }

    @Test
    public void semaphoreTest() throws InterruptedException {

        BoundedHashSet<String> set = new BoundedHashSet<>(5);
        for (int i = 0; i < 6; i++) {
            if (set.add(i + "")) {
                System.out.println(i + " added !");
            } else {
                System.out.println(i + " not add to Set!");
            }
        }
    }

上面的示例將一個普通的Set變成了有界容器。執行結果如下:

0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!

 

三、柵欄CyclicBarrier 

這個跟閉鎖類似,可以通過代碼設置一個『屏障』點,其它線程到達該點后才能繼續,常用于約束其它線程都到達某一狀態后,才允許做后面的事情。

    public class Worker extends Thread {

        private CyclicBarrier cyclicBarrier;

        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        private void step1() {
            System.out.println(this.getName() + " step 1 ...");
        }

        private void step2() {
            System.out.println(this.getName() + " step 2 ...");
        }

        public void run() {
            step1();
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            step2();
        }
    }

    @Test
    public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker(cyclicBarrier);
            w.start();
        }
        cyclicBarrier.await();

    }

這里我們假設有一個worder線程,里面有2步操作,要求所有線程完成step1后,才能繼續step2. 執行結果如下:

Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...

 

四、Exchanger

如果2個線程需要交換數據,Exchanger就能派上用場了,見下面的示例:

    @Test
    public void exchangerTest() {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread t1 = new Thread(() -> {
            String temp = "AAAAAA";
            System.out.println("thread 1 交換前:" + temp);
            try {
                temp = exchanger.exchange(temp);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread 1 交換后:" + temp);
        });

        Thread t2 = new Thread(() -> {
            String temp = "BBBBBB";
            System.out.println("thread 2 交換前:" + temp);
            try {
                temp = exchanger.exchange(temp);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread 2 交換后:" + temp);
        });

        t1.start();
        t2.start();
    }

 執行結果:

thread 1 交換前:AAAAAA
thread 2 交換前:BBBBBB
thread 2 交換后:AAAAAA
thread 1 交換后:BBBBBB

 

五、FutureTask/Future

一些很耗時的操作,可以用Future轉化成異步,不阻塞后續的處理,直到真正需要返回結果時調用get拿到結果

    @Test
    public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException {

        Callable<String> callable = () -> {
            System.out.println("很耗時的操作處理中。。。");
            Thread.sleep(5000);
            return "done";
        };

        FutureTask<String> futureTask = new FutureTask<>(callable);

        System.out.println("就緒。。。");
        new Thread(futureTask).start();
        System.out.println("主線程其它處理。。。");
        System.out.println(futureTask.get());
        System.out.println("處理完成!");

        System.out.println("-----------------");

        System.out.println("executor 就緒。。。");
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> future = executorService.submit(callable);
        System.out.println(future.get(10, TimeUnit.SECONDS));
    }

 執行結果:

就緒。。。
主線程其它處理。。。
很耗時的操作處理中。。。
done
處理完成!
-----------------
executor 就緒。。。
很耗時的操作處理中。。。
done

 

六、阻塞隊列BlockingQueue

阻塞隊列可以在線程間實現生產者-消費者模式。比如下面的示例:線程producer模擬快速生產數據,而線程consumer模擬慢速消費數據,當達到隊列的上限時(即:生產者產生的數據,已經放不下了),隊列就堵塞住了。

@Test
    public void blockingQueueTest() throws InterruptedException {
        final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5);

        Thread producer = new Thread() {
            public void run() {
                Random rnd = new Random();
                while (true) {
                    try {
                        int i = rnd.nextInt(10000);
                        blockingDeque.put(i + "");
                        System.out.println(this.getName() + " 產生了一個數字:" + i);
                        Thread.sleep(rnd.nextInt(50));//模擬生產者快速生產
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        producer.setName("producer 1");


        Thread consumer = new Thread() {
            public void run() {
                while (true) {
                    Random rnd = new Random();
                    try {

                        String i = blockingDeque.take();
                        System.out.println(this.getName() + " 消費了一個數字:" + i);
                        Thread.sleep(rnd.nextInt(10000));//消費者模擬慢速消費
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        consumer.setName("consumer 1");

        producer.start();
        consumer.start();

        while (true) {
            Thread.sleep(100);
        }
    }

執行結果:

producer 1 產生了一個數字:6773
consumer 1 消費了一個數字:6773
producer 1 產生了一個數字:4456
producer 1 產生了一個數字:8572
producer 1 產生了一個數字:5764
producer 1 產生了一個數字:2874
producer 1 產生了一個數字:780 # 注意這里就已經堵住了,直到有消費者消費一條數據,才能繼續生產
consumer 1 消費了一個數字:4456
producer 1 產生了一個數字:4193


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()