這篇文章分別從線程池大小參數的設置、工作線程的創建、空閑線程的回收、阻塞隊列的使用、任務拒絕策略、線程池Hook等方面來了解線程池的使用,其中涉及到一些細節包括不同參數、不同隊列、不同拒絕策略的選擇、產生的影響和行為、為更好的使用線程池奠定知識基礎,其中值得注意的部分我用粗體標識。
ExecutorService基于池化的線程來執行用戶提交的任務,通常可以簡單的通過Executors提供的工廠方法來創建ThreadPoolExecutor實例。
線程池解決的兩個問題:1)線程池通過減少每次做任務的時候產生的性能消耗來優化執行大量的異步任務的時候的系統性能。2)線程池還提供了限制和管理批量任務被執行的時候消耗的資源、線程的方法。另外ThreadPoolExecutor還提供了簡單的統計功能,比如當前有多少任務被執行完了。
快速開始
為了使得線程池適合大量不同的應用上下文環境,ThreadPoolExecutor提供了很多可以配置的參數和可被用來擴展的鉤子。然而,用戶還可以通過使用Executors提供的一些工廠方法來快速創建ThreadPoolExecutor實例。比如:
- 使用Executors#newCachedThreadPool可以快速創建一個擁有自動回收線程功能且沒有限制的線程池。
- 使用Executors#newFixedThreadPool可以用來創建一個固定線程大小的線程池。
- 使用Executors#newSingleThreadExecutor可以用來創建一個單線程的執行器。
如果上面的方法創建的實例不能滿足我們的需求,我們可以自己通過參數來配置,實例化一個實例。
關于線程數大小參數設置需要知道的
ThreadPoolExecutor會根據corePoolSize和maximumPoolSize來動態調整線程池的大小:poolSize。
當任務通過executor提交給線程池的時候,我們需要知道下面幾個點:
- 如果這個時候當前池子中的工作線程數小于corePoolSize,則新創建一個新的工作線程來執行這個任務,不管工作線程集合中有沒有線程是處于空閑狀態。
- 如果池子中有比corePoolSize大的但是比maximumPoolSize小的工作線程,任務會首先被嘗試著放入隊列,這里有兩種情況需要單獨說一下:a、如果任務被成功的放入隊列,則看看是否需要開啟新的線程來執行任務,只有當當前工作線程數為0的時候才會創建新的線程,因為之前的線程有可能因為都處于空閑狀態或因為工作結束而被移除。
b、如果放入隊列失敗,則才會去創建新的工作線程。
- 如果corePoolSize和maximumPoolSize相同,則線程池的大小是固定的。
- 通過將maximumPoolSize設置為無限大,我們可以得到一個無上限的線程池。
- 除了通過構造參數設置這幾個線程池參數之外我們還可以在運行時設置。
核心線程WarmUp
默認情況下,核心工作線程值在初始的時候被創建,當新任務來到的時候被啟動,但是我們可以通過重寫prestartCoreThread或prestartCoreThreads方法來改變這種行為。通常場景我們可以在應用啟動的時候來WarmUp核心線程,從而達到任務過來能夠立馬執行的結果,使得初始任務處理的時間得到一定優化。
定制工作線程的創建
新的線程是通過ThreadFactory來創建的,如果沒有指定,默認的Executors#defaultThreadFactory將被使用,這個時候創建的線程將都屬于同一個線程組,擁有同樣的優先級和daemon狀態。擴展配置ThreadFactory,我們可以配置線程的名字、線程組合daemon狀態。如果調用ThreadFactory#createThread的時候失敗,將返回null,executor將不會執行任何任務。
空閑線程回收
如果當前池子中的工作線程數大于corePoolSize,如果超過這個數字的線程處于空閑的時間大于keepAliveTime,則這些線程將會被終止,這是一種減少不必要資源消耗的策略。這個參數可以在運行時被改變,我們同樣可以將這種策略應用給核心線程,我們可以通過調用allowCoreThreadTimeout來實現。
選擇合適的阻塞隊列
所有的阻塞隊列都可以被用來存放任務,但是使用不同的隊列針對corePoolSize會表現不同的行為:
當池中工作線程數小于corePoolSize的時候,每次來任務的時候都會創建一個新的工作線程。
當池中工作線程數大于等于corePoolSize的時候,每次任務來的時候都會首先嘗試將線程放入隊列,而不是直接去創建線程。
如果放入隊列失敗,且當先池中線程數小于maximumPoolSize的時候,則會創建一個工作線程。
下面主要是不同隊列策略表現:
直接遞交:一種比較好的默認選擇是使用SynchronousQueue,這種策略會將提交的任務直接傳送給工作線程,而不持有。如果當前沒有工作線程來處理,即任務放入隊列失敗,則根據線程池的實現,會引發新的工作線程創建,因此新提交的任務會被處理。這種策略在當提交的一批任務之間有依賴關系的時候避免了鎖競爭消耗。值得一提的是,這種策略最好是配合unbounded線程數來使用,從而避免任務被拒絕。同時我們必須要考慮到一種場景,當任務到來的速度大于任務處理的速度,將會引起無限制的線程數不斷的增加。
無界隊列:使用無界隊列如LinkedBlockingQueue沒有指定最大容量的時候,將會引起當核心線程都在忙的時候,新的任務被放在隊列上,因此,永遠不會有大于corePoolSize的線程被創建,因此maximumPoolSize參數將失效。這種策略比較適合所有的任務都不相互依賴,獨立執行。舉個例子,如網頁服務器中,每個線程獨立處理請求。但是當任務處理速度小于任務進入速度的時候會引起隊列的無限膨脹。
有界隊列:有界隊列如ArrayBlockingQueue幫助限制資源的消耗,但是不容易控制。隊列長度和maximumPoolSize這兩個值會相互影響,使用大的隊列和小maximumPoolSize會減少CPU的使用、操作系統資源、上下文切換的消耗,但是會降低吞吐量,如果任務被頻繁的阻塞如IO線程,系統其實可以調度更多的線程。使用小的隊列通常需要大maximumPoolSize,從而使得CPU更忙一些,但是又會增加降低吞吐量的線程調度的消耗。總結一下是IO密集型可以考慮多些線程來平衡CPU的使用,CPU密集型可以考慮少些線程減少線程調度的消耗。
選擇適合的拒絕策略
當新的任務到來的而線程池被關閉的時候,或線程數和隊列已經達到上限的時候,我們需要去做一個決定,怎么拒絕這些任務。下面介紹一下常用的策略:
ThreadPoolExecutor#AbortPolicy:這個策略直接拋出RejectedExecutionException異常。
ThreadPoolExecutor#CallerRunsPolicy:這個策略將會使用Caller線程來執行這個任務,這是一種feedback策略,可以降低任務提交的速度。
ThreadPoolExecutor#DiscardPolicy:這個策略將會直接丟棄任務。
ThreadPoolExecutor#DiscardOldestPolicy:這個策略將會把任務隊列頭部的任務丟棄,然后重新嘗試執行,如果還是失敗則繼續實施策略。
除了上面的幾種策略,我們也可以通過實現RejectedExecutionHandler來實現自己的策略。
利用Hook嵌入你的行為
ThreadPoolExecutor提供了protected類型可以被覆蓋的鉤子方法,允許用戶在任務執行之前會執行之后做一些事情。我們可以通過它來實現比如初始化ThreadLocal、收集統計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務被執行完的時候讓用戶插入邏輯,如rerminated。
如果hook方法執行失敗,則內部的工作線程的執行將會失敗或被中斷。
可訪問的隊列
getQueue方法可以用來訪問queue隊列以進行一些統計或者debug工作,我們不建議用作其他用途。同時remove方法和purge方法可以用來將任務從隊列中移除。
關閉線程池
當線程池不在被引用并且工作線程數為0的時候,線程池將被終止。我們也可以調用shutdown來手動終止線程池。如果我們忘記調用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的。
寫在最后
JAVA本身提供的API已經可以讓我們快速的進行基于線程池的多線程開發,但是我們必須要為我們寫的代碼負責,每一個參數的設置和策略的選擇跟不同應用場景有絕對的關系。然而對于不同參數和不同策略的選擇并不是一件容易的事情,我們必須要先回答一些基礎問題:每創建一個線程,操作系統為我們做了哪些事情,這個線程的操作系統資源消耗主要在哪部分?假如我的應用場景是IO密集型的,那么我需要更多的線程還是更少的線程?假如我們的CPU操作和IO操作大概各占一半的話我們又需要如何選擇?等等一些列問題。我認為、多線程開發是一件很容易的事情也是一件很不容易的事情。
文章列表
留言列表