文章出處

當我們通過Executor提交一組并發執行的任務,并且希望在每一個任務完成后能立即得到結果,有兩種方式可以采取:

 

方式一:

通過一個list來保存一組future,然后在循環中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,那么在調用get方式時,需要將超時時間設置為0 

Java代碼  
  1. public class CompletionServiceTest {  
  2.   
  3.     static class Task implements Callable<String>{  
  4.         private int i;  
  5.           
  6.         public Task(int i){  
  7.             this.i = i;  
  8.         }  
  9.   
  10.         @Override  
  11.         public String call() throws Exception {  
  12.             Thread.sleep(10000);  
  13.             return Thread.currentThread().getName() + "執行完任務:" + i;  
  14.         }     
  15.     }  
  16.       
  17.     public static void main(String[] args){  
  18.         testUseFuture();  
  19.     }  
  20.       
  21.     private static void testUseFuture(){  
  22.         int numThread = 5;  
  23.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  24.         List<Future<String>> futureList = new ArrayList<Future<String>>();  
  25.         for(int i = 0;i<numThread;i++ ){  
  26.             Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
  27.             futureList.add(future);  
  28.         }  
  29.                   
  30.         while(numThread > 0){  
  31.             for(Future<String> future : futureList){  
  32.                 String result = null;  
  33.                 try {  
  34.                     result = future.get(0, TimeUnit.SECONDS);  
  35.                 } catch (InterruptedException e) {  
  36.                     e.printStackTrace();  
  37.                 } catch (ExecutionException e) {  
  38.                     e.printStackTrace();  
  39.                 } catch (TimeoutException e) {  
  40.                     //超時異常直接忽略  
  41.                 }  
  42.                 if(null != result){  
  43.                     futureList.remove(future);  
  44.                     numThread--;  
  45.                     System.out.println(result);  
  46.                     //此處必須break,否則會拋出并發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)  
  47.                     break;  
  48.                 }  
  49.             }  
  50.         }  
  51.     }  
  52. }  

 方式二:

第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到代碼最簡化的效果。

Java代碼  
  1. public class CompletionServiceTest {  
  2.   
  3.     static class Task implements Callable<String>{  
  4.         private int i;  
  5.           
  6.         public Task(int i){  
  7.             this.i = i;  
  8.         }  
  9.   
  10.         @Override  
  11.         public String call() throws Exception {  
  12.             Thread.sleep(10000);  
  13.             return Thread.currentThread().getName() + "執行完任務:" + i;  
  14.         }     
  15.     }  
  16.       
  17.     public static void main(String[] args) throws InterruptedException, ExecutionException{  
  18.         testExecutorCompletionService();  
  19.     }  
  20.       
  21.     private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
  22.         int numThread = 5;  
  23.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  24.         CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
  25.         for(int i = 0;i<numThread;i++ ){  
  26.             completionService.submit(new CompletionServiceTest.Task(i));  
  27.         }  
  28. }  
  29.           
  30.         for(int i = 0;i<numThread;i++ ){       
  31.             System.out.println(completionService.take().get());  
  32.         }  
  33.           
  34.     }  

 

ExecutorCompletionService分析:

 CompletionService是Executor和BlockingQueue的結合體。

Java代碼  
  1. public ExecutorCompletionService(Executor executor) {  
  2.         if (executor == null)  
  3.             throw new NullPointerException();  
  4.         this.executor = executor;  
  5.         this.aes = (executor instanceof AbstractExecutorService) ?  
  6.             (AbstractExecutorService) executor : null;  
  7.         this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
  8.     }  

 任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,

Java代碼  
  1. public Future<V> submit(Callable<V> task) {  
  2.         if (task == null) throw new NullPointerException();  
  3.         RunnableFuture<V> f = newTaskFor(task);  
  4.         executor.execute(new QueueingFuture(f));  
  5.         return f;  
  6.     }  

 QueueingFuture是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

 

Java代碼  
  1. private class QueueingFuture extends FutureTask<Void> {  
  2.         QueueingFuture(RunnableFuture<V> task) {  
  3.             super(task, null);  
  4.             this.task = task;  
  5.         }  
  6.         protected void done() { completionQueue.add(task); }  
  7.         private final Future<V> task;  
  8.     }  

 而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。

Java代碼  
  1. public Future<V> take() throws InterruptedException {  
  2.     return completionQueue.take();  
  3. }  
  4.   
  5. public Future<V> poll() {  
  6.     return completionQueue.poll();  
  7. 原文:http://xw-z1985.iteye.com/blog/1997077

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()