初識Parallel Extensions之PLINQ
今天我們就來談談平行擴展的關鍵組件之一PLINQ(Parallel LINQ)。微軟對PLINQ在Parallel FX中的定位是:PLINQ是TPL(Task Parallel Library)的一個高層應用。由于目前微軟對TPL研發的時間還比較短,這個社區預覽版的TPL版本的質量還是比較低的,而且微軟發布這個版本的目的也是為了更好的獲得開發社區的反饋信息,為了讓PLINQ有更高的質量,所以目前PLINQ還是基于ThreadPool的實現,而不是基于TPL的API的。不過這只是內部實現不同而已,以后正式發布的時候PLINQ的對外接口的變更應該不會太大。
如何使用PLINQ?
1 添加System.Threading.dll到引用中
2 通過調用System.Linq.ParallelQuery.AsParallel擴展方法,將數據封裝到IParallelEnumerable中。
基于聲明方式的數據并行性
調用AsParallel擴展方法可使得編譯器使用System.Linq.ParallelEnumerable版本的查詢運算符,而不是System.Linq.Enumerable的。熟悉LINQ的人都知道,查詢表達式在編譯時都將轉化成對擴展方法的調用。對于LINQ而言,所有的擴展方法都封裝在System.Linq.Enumerable靜態類中,該類定義的都是針對IEnumerable數據源的擴展方法。而對于PLINQ,所有的擴展方法都封裝在System.Linq.ParallelEnumerable靜態類中,而且該類針對的都是IParallelEnumerable數據源的擴展方法,是System.Linq.Enumerable靜態類擴展方法的鏡像,只不過是通過并行方式對查詢進行評估。IParallelEnumerable接口從IEnumerable接口繼承,所以PLINQ也具有LINQ的延遲執行的特點,以及執行foreach。
為了讓大家清晰知道系統是如何將使用System.Linq.Enumerable版本的查詢運算符變成System.Linq.ParallelEnumerable版本的,我們先來看看System.Linq.ParallelQuery.AsParallel方法:
public static IParallelEnumerable AsParallel(IEnumerable source)
很顯然這個方法就是將IEnumerable的數據源轉化成IParallelEnumerable以使得使用平行版本的運算。這就是平行架構中通過AsParallel的聲明來使用并行使用數據的方式,也是PLINQ的編程模型。
所以這種基于聲明方式的數據并行性使得從LINQ到PLINQ的轉化非常容易,例如我們有這樣的LINQ代碼片段:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words select Process(word)).ToArray();
我們很容易就可以將其變成PLINQ版本:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words.AsParallel() select Process(word)).ToArray();
當然,如果你是通過查詢操作(就是直接調用靜態擴展函數),而不是使用查詢表達式(有時候查詢表達式沒有提供相應的表達式語句,例如C#3.0中沒有提供Skip和Take相對應的查詢表達式語句,我們只能通過直接調用查詢操作函數)的情況下,將LINQ遷移到PLINQ,我們除了要調用AsParallel方法,還需要將直接調用Enumerable的方法改成對ParallelEnumerable的調用,例如:
IEnumerable data = ...;
var q = Enumerable.Select(Enumerable.OrderBy(
Enumerable.Where(data, (x) => p(x)),(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
要使用 PLINQ,必須按如下方式重新編寫該查詢:
IEnumerable data = ...;
var q = ParallelEnumerable.Select(ParallelEnumerable.OrderBy(
ParallelEnumerable.Where(data.AsParallel(), (x) => p(x)),
(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
注意:有些查詢運算符是二元的,使用兩個IEnumerable作為輸入參數(例如Join),最左邊數據源的類型決定了使用LINQ還是PLINQ,因此你只需要在第一個數據源上調用AsParallel便能使查詢并行查詢,例如:
IEnumerable leftData = ..., rightData = ...;
var q = from x in leftData.AsParallel()
join y in rightData on x.a == y.b
select f(x, y);