最近把《java并發編程實戰》-Java Consurrency in Practice 重溫了一遍,把書中提到的一些常用工具記錄于此:
一、閉鎖(門栓)- CountDownLatch
適用場景:多線程測試時,通常為了精確計時,要求所有線程都ready后,才開始執行,防止有線程先起跑,造成不公平,類似的,所有線程執行完,整個程序才算運行完成。
/** * 閉鎖測試(菩提樹下的楊過 http://yjmyzz.cnblogs.com/) * * @throws InterruptedException */ @Test public void countdownLatch() throws InterruptedException { CountDownLatch startLatch = new CountDownLatch(1); //類似發令槍 CountDownLatch endLatch = new CountDownLatch(10);//這里的數量,要與線程數相同 for (int i = 0; i < 10; i++) { Thread t = new Thread(() -> { try { startLatch.await(); //先等著,直到發令槍響,防止有線程先run System.out.println(Thread.currentThread().getName() + " is running..."); Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { endLatch.countDown(); //每個線程執行完成后,計數 } }); t.setName("線程-" + i); t.start(); } long start = System.currentTimeMillis(); startLatch.countDown();//發令槍響,所有線程『開跑』 endLatch.await();//等所有線程都完成 long end = System.currentTimeMillis(); System.out.println("done! exec time => " + (end - start) + " ms"); }
執行結果:
線程-1 is running...
線程-5 is running...
線程-8 is running...
線程-4 is running...
線程-3 is running...
線程-0 is running...
線程-2 is running...
線程-9 is running...
線程-7 is running...
線程-6 is running...
done! exec time => 13 ms
注:大家可以把第14行注釋掉,再看看運行結果有什么不同。
二、信號量(Semaphore)
適用場景:用于資源數有限制的并發訪問場景。
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore semaphore; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); this.semaphore = new Semaphore(bound); } public boolean add(T t) throws InterruptedException { if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) { return false; } ; boolean added = false; try { added = set.add(t); return added; } finally { if (!added) { semaphore.release(); } } } public boolean remove(Object o) { boolean removed = set.remove(o); if (removed) { semaphore.release(); } return removed; } } @Test public void semaphoreTest() throws InterruptedException { BoundedHashSet<String> set = new BoundedHashSet<>(5); for (int i = 0; i < 6; i++) { if (set.add(i + "")) { System.out.println(i + " added !"); } else { System.out.println(i + " not add to Set!"); } } }
上面的示例將一個普通的Set變成了有界容器。執行結果如下:
0 added !
1 added !
2 added !
3 added !
4 added !
5 not add to Set!
三、柵欄CyclicBarrier
這個跟閉鎖類似,可以通過代碼設置一個『屏障』點,其它線程到達該點后才能繼續,常用于約束其它線程都到達某一狀態后,才允許做后面的事情。
public class Worker extends Thread { private CyclicBarrier cyclicBarrier; public Worker(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } private void step1() { System.out.println(this.getName() + " step 1 ..."); } private void step2() { System.out.println(this.getName() + " step 2 ..."); } public void run() { step1(); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } step2(); } } @Test public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier(11); for (int i = 0; i < 10; i++) { Worker w = new Worker(cyclicBarrier); w.start(); } cyclicBarrier.await(); }
這里我們假設有一個worder線程,里面有2步操作,要求所有線程完成step1后,才能繼續step2. 執行結果如下:
Thread-0 step 1 ...
Thread-1 step 1 ...
Thread-2 step 1 ...
Thread-3 step 1 ...
Thread-4 step 1 ...
Thread-5 step 1 ...
Thread-6 step 1 ...
Thread-7 step 1 ...
Thread-8 step 1 ...
Thread-9 step 1 ...
Thread-9 step 2 ...
Thread-0 step 2 ...
Thread-3 step 2 ...
Thread-4 step 2 ...
Thread-6 step 2 ...
Thread-2 step 2 ...
Thread-1 step 2 ...
Thread-8 step 2 ...
Thread-7 step 2 ...
Thread-5 step 2 ...
四、Exchanger
如果2個線程需要交換數據,Exchanger就能派上用場了,見下面的示例:
@Test public void exchangerTest() { Exchanger<String> exchanger = new Exchanger<>(); Thread t1 = new Thread(() -> { String temp = "AAAAAA"; System.out.println("thread 1 交換前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread 1 交換后:" + temp); }); Thread t2 = new Thread(() -> { String temp = "BBBBBB"; System.out.println("thread 2 交換前:" + temp); try { temp = exchanger.exchange(temp); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread 2 交換后:" + temp); }); t1.start(); t2.start(); }
執行結果:
thread 1 交換前:AAAAAA
thread 2 交換前:BBBBBB
thread 2 交換后:AAAAAA
thread 1 交換后:BBBBBB
五、FutureTask/Future
一些很耗時的操作,可以用Future轉化成異步,不阻塞后續的處理,直到真正需要返回結果時調用get拿到結果
@Test public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException { Callable<String> callable = () -> { System.out.println("很耗時的操作處理中。。。"); Thread.sleep(5000); return "done"; }; FutureTask<String> futureTask = new FutureTask<>(callable); System.out.println("就緒。。。"); new Thread(futureTask).start(); System.out.println("主線程其它處理。。。"); System.out.println(futureTask.get()); System.out.println("處理完成!"); System.out.println("-----------------"); System.out.println("executor 就緒。。。"); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(callable); System.out.println(future.get(10, TimeUnit.SECONDS)); }
執行結果:
就緒。。。
主線程其它處理。。。
很耗時的操作處理中。。。
done
處理完成!
-----------------
executor 就緒。。。
很耗時的操作處理中。。。
done
六、阻塞隊列BlockingQueue
阻塞隊列可以在線程間實現生產者-消費者模式。比如下面的示例:線程producer模擬快速生產數據,而線程consumer模擬慢速消費數據,當達到隊列的上限時(即:生產者產生的數據,已經放不下了),隊列就堵塞住了。
@Test public void blockingQueueTest() throws InterruptedException { final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5); Thread producer = new Thread() { public void run() { Random rnd = new Random(); while (true) { try { int i = rnd.nextInt(10000); blockingDeque.put(i + ""); System.out.println(this.getName() + " 產生了一個數字:" + i); Thread.sleep(rnd.nextInt(50));//模擬生產者快速生產 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; producer.setName("producer 1"); Thread consumer = new Thread() { public void run() { while (true) { Random rnd = new Random(); try { String i = blockingDeque.take(); System.out.println(this.getName() + " 消費了一個數字:" + i); Thread.sleep(rnd.nextInt(10000));//消費者模擬慢速消費 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }; consumer.setName("consumer 1"); producer.start(); consumer.start(); while (true) { Thread.sleep(100); } }
執行結果:
producer 1 產生了一個數字:6773
consumer 1 消費了一個數字:6773
producer 1 產生了一個數字:4456
producer 1 產生了一個數字:8572
producer 1 產生了一個數字:5764
producer 1 產生了一個數字:2874
producer 1 產生了一個數字:780 # 注意這里就已經堵住了,直到有消費者消費一條數據,才能繼續生產
consumer 1 消費了一個數字:4456
producer 1 產生了一個數字:4193
文章列表