4.0中的并行計算和多線程詳解(一)
并行計算部分
沿用微軟的寫法,System.Threading.Tasks.::.Parallel類,提供對并行循環和區域的支持。 我們會用到的方法有For,ForEach,Invoke。
一、簡單使用
首先我們初始化一個List用于循環,這里我們循環10次。(后面的代碼都會按這個標準進行循環)
- Program.Data = new List<int>();
- for (int i = 0; i < 10; i++)
- {
- Data.Add(i);
- }
下面我們定義4個方法,分別為for,foreach,并行For,并行ForEach。并測試他們的運行時長。
- /// <summary>
- /// 是否顯示執行過程
- /// </summary>
- public bool ShowProcessExecution = false;
- /// <summary>
- /// 這是普通循環for
- /// </summary>
- private void Demo1()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- for (int i = 0; i < data.Count; i++)
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(data[i]);
- }
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("普通循環For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
- /// <summary>
- /// 這是普通循環foreach
- /// </summary>
- private void Demo2()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- foreach (var i in data)
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(i);
- }
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("普通循環For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
- /// <summary>
- /// 這是并行計算For
- /// </summary>
- private void Demo3()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- Parallel.For(0, data.Count, (i) =>
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(data[i]);
- });
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("并行運算For運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
- /// <summary>
- /// 這是并行計算ForEach
- /// </summary>
- private void Demo4()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- Parallel.ForEach(data, (i) =>
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(i);
- });
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("并行運算ForEach運行時長:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
下面是運行結果:
這里我們可以看出并行循環在執行效率上的優勢了。
結論1:在對一個數組內的每一個項做單獨處理時,完全可以選擇并行循環的方式來提升執行效率。
原理1:并行計算的線程開啟是緩步開啟的,線程數量1,2,4,8緩步提升。(不詳,PLinq最多64個線程,可能這也是64)
二、 并行循環的中斷和跳出
當在進行循環時,偶爾會需要中斷循環或跳出循環。下面是兩種跳出循環的方法Stop和Break,LoopState是循環狀態的參數。
- /// <summary>
- /// 中斷Stop
- /// </summary>
- private void Demo5()
- {
- List<int> data = Program.Data;
- Parallel.For(0, data.Count, (i, LoopState) =>
- {
- if (data[i] > 5)
- LoopState.Stop();
- Thread.Sleep(500);
- Console.WriteLine(data[i]);
- });
- Console.WriteLine("Stop執行結束。");
- }
- /// <summary>
- /// 中斷Break
- /// </summary>
- private void Demo6()
- {
- List<int> data = Program.Data;
- Parallel.ForEach(data, (i, LoopState) =>
- {
- if (i > 5)
- LoopState.Break();
- Thread.Sleep(500);
- Console.WriteLine(i);
- });
- Console.WriteLine("Break執行結束。");
- }
執行結果如下:
結論2:使用Stop會立即停止循環,使用Break會執行完畢所有符合條件的項。
三、并行循環中為數組/集合添加項
上面的應用場景其實并不是非常多見,畢竟只是為了遍歷一個數組內的資源,我們更多的時候是為了遍歷資源,找到我們所需要的。那么請繼續看。
下面是我們一般會想到的寫法:
- private void Demo7()
- {
- List<int> data = new List<int>();
- Parallel.For(0, Program.Data.Count, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Add(Program.Data[i]);
- });
- Console.WriteLine("執行完成For.");
- }
- private void Demo8()
- {
- List<int> data = new List<int>();
- Parallel.ForEach(Program.Data, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Add(Program.Data[i]);
- });
- Console.WriteLine("執行完成ForEach.");
- }
看起來應該是沒有問題的,但是我們多次運行后會發現,偶爾會出現錯誤如下:
這是因為List是非線程安全的類,我們需要使用System.Collections.Concurrent命名空間下的類型來用于并行循環體內。
類 | 說明 |
BlockingCollection<T> | 為實現 IProducerConsumerCollection<T> 的線程安全集合提供阻止和限制功能。 |
ConcurrentBag<T> | 表示對象的線程安全的無序集合。 |
ConcurrentDictionary<TKey, TValue> | 表示可由多個線程同時訪問的鍵值對的線程安全集合。 |
ConcurrentQueue<T> | 表示線程安全的先進先出 (FIFO) 集合。 |
ConcurrentStack<T> | 表示線程安全的后進先出 (LIFO) 集合。 |
OrderablePartitioner<TSource> | 表示將一個可排序數據源拆分成多個分區的特定方式。 |
Partitioner | 提供針對數組、列表和可枚舉項的常見分區策略。 |
Partitioner<TSource> | 表示將一個數據源拆分成多個分區的特定方式。 |
那么我們上面的代碼可以修改為,加了了ConcurrentQueue和ConcurrentStack的最基本的操作。
- /// <summary>
- /// 并行循環操作集合類,集合內只取5個對象
- /// </summary>
- private void Demo7()
- {
- ConcurrentQueue<int> data = new ConcurrentQueue<int>();
- Parallel.For(0, Program.Data.Count, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Enqueue(Program.Data[i]);//將對象加入到隊列末尾
- });
- int R;
- while (data.TryDequeue(out R))//返回隊列中開始處的對象
- {
- Console.WriteLine(R);
- }
- Console.WriteLine("執行完成For.");
- }
- /// <summary>
- /// 并行循環操作集合類
- /// </summary>
- private void Demo8()
- {
- ConcurrentStack<int> data = new ConcurrentStack<int>();
- Parallel.ForEach(Program.Data, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Push(Program.Data[i]);//將對象壓入棧中
- });
- int R;
- while (data.TryPop(out R))//彈出棧頂對象
- {
- Console.WriteLine(R);
- }
- Console.WriteLine("執行完成ForEach.");
- }
ok,這里返回一個序列的問題也解決了。
結論3:在并行循環內重復操作的對象,必須要是thread-safe(線程安全)的。集合類的線程安全對象全部在System.Collections.Concurrent命名空間下。
四、返回集合運算結果/含有局部變量的并行循環
使用循環的時候經常也會用到迭代,那么在并行循環中叫做 含有局部變量的循環 。下面的代碼中詳細的解釋,這里就不啰嗦了。
- /// <summary>
- /// 具有線程局部變量的For循環
- /// </summary>
- private void Demo9()
- {
- List<int> data = Program.Data;
- long total = 0;
- //這里定義返回值為long類型方便下面各個參數的解釋
- Parallel.For<long>(0, // For循環的起點
- data.Count, // For循環的終點
- () => 0, // 初始化局部變量的方法(long),既為下面的subtotal的初值
- (i, LoopState, subtotal) => // 為每個迭代調用一次的委托,i是當前索引,LoopState是循環狀態,subtotal為局部變量名
- {
- subtotal += data[i]; // 修改局部變量
- return subtotal; // 傳遞參數給下一個迭代
- },
- (finalResult) => Interlocked.Add(ref total, finalResult) //對每個線程結果執行的最后操作,這里是將所有的結果相加
- );
- Console.WriteLine(total);
- }
- /// <summary>
- /// 具有線程局部變量的ForEach循環
- /// </summary>
- private void Demo10()
- {
- List<int> data = Program.Data;
- long total = 0;
- Parallel.ForEach<int, long>(data, // 要循環的集合對象
- () => 0, // 初始化局部變量的方法(long),既為下面的subtotal的初值
- (i, LoopState, subtotal) => // 為每個迭代調用一次的委托,i是當前元素,LoopState是循環狀態,subtotal為局部變量名
- {
- subtotal += i; // 修改局部變量
- return subtotal; // 傳遞參數給下一個迭代
- },
- (finalResult) => Interlocked.Add(ref total, finalResult) //對每個線程結果執行的最后操作,這里是將所有的結果相加
- );
- Console.WriteLine(total);
- }
結論4:并行循環中的迭代,確實很傷人。代碼太難理解了。
五、PLinq(Linq的并行計算)
上面介紹完了For和ForEach的并行計算盛宴,微軟也沒忘記在Linq中加入并行計算。下面介紹Linq中的并行計算。
4.0中在System.Linq命名空間下加入了下面幾個新的類:
類 | 說明 |
ParallelEnumerable | 提供一組用于查詢實現 ParallelQuery{TSource} 的對象的方法。這是 Enumerable 的并行等效項。 |
ParallelQuery | 表示并行序列。 |
ParallelQuery<TSource> | 表示并行序列。 |
原理2:PLinq最多會開啟64個線程
原理3:PLinq會自己判斷是否可以進行并行計算,如果不行則會以順序模式運行。
原理4:PLinq會在昂貴的并行算法或成本較低的順序算法之間進行選擇,默認情況下它選擇順序算法。
在ParallelEnumerable中提供的并行化的方法:
ParallelEnumerable 運算符 | 說明 |
AsParallel() | PLINQ 的入口點。指定如果可能,應并行化查詢的其余部分。 |
AsSequential() | 指定查詢的其余部分應像非并行 LINQ 查詢一樣按順序運行。 |
AsOrdered() | 指定 PLINQ 應保留查詢的其余部分的源序列排序,直到例如通過使用 orderby 子句更改排序為止。 |
AsUnordered() | 指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。 |
WithCancellation() | 指定 PLINQ 應定期監視請求取消時提供的取消標記和取消執行的狀態。 |
WithDegreeOfParallelism() | 指定 PLINQ 應當用來并行化查詢的處理器的最大數目。 |
WithMergeOptions() | 提供有關 PLINQ 應當如何(如果可能)將并行結果合并回到使用線程上的一個序列的提示。 |
WithExecutionMode() | 指定 PLINQ 應當如何并行化查詢(即使默認行為是按順序運行查詢)。 |
ForAll() | 多線程枚舉方法,與循環訪問查詢結果不同,它允許在不首先合并回到使用者線程的情況下并行處理結果。 |
Aggregate() 重載 | 對于 PLINQ 唯一的重載,它啟用對線程本地分區的中間聚合以及一個用于合并所有分區結果的最終聚合函數。 |
下面是PLinq的簡單代碼:
- /// <summary>
- /// PLinq簡介
- /// </summary>
- private void Demo11()
- {
- var source = Enumerable.Range(1, 10000);
- //查詢結果按source中的順序排序
- var evenNums = from num in source.AsParallel().AsOrdered()
- where num % 2 == 0
- select num;
- //ForAll的使用
- ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();
- var query = from num in source.AsParallel()
- where num % 10 == 0
- select num;
- query.ForAll((e) => concurrentBag.Add(e * e));
- }
上面代碼中使用了ForAll,ForAll和foreach的區別如下:
PLinq的東西很繁雜,但是都只是幾個簡單的方法,熟悉下方法就好了。