.Net4.0 Parallel編程(二)Data Parallelism 中
在上篇文章中看過了使用Parrallel.For、Parael.Foreach在效率上給我們帶來的提高。本文就來如何終止循環、線程局部變量 進行說明。
Thread-Local Variables
首先我們來看下線程局部變量,是的我們也許一直在想我們如何去定義一個線程局部變量呢。先看段順序執行的代碼:
public void NormalSequenceTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
for (int i = 0; i < nums.Length;i++ )
{
total += nums[i];
}
Console.WriteLine("The total is {0}", total);
}
執行結果:
我們再來看這段代碼:
public void NormalParallelTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.For(0,nums.Length,i=>
{
total += nums[i];
});
Console.WriteLine("The total is {0}", total);
}
執行結果:
再運行下:
也許我們會感到很奇怪為什么會這樣呢,其實我們想想就可以明白了,total變量是公共的,而我們的程序是多個線程的加,而多個線程之間是不能把數據共享的。其實我們需要的是在每個線程中計算出一個和值,然后再進行累加。我們來看看線程局部變量:
public void ThreadLocalTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.For<long>(0, nums.Length, () => 0, (j, loop, subtotal) =>
{
subtotal += nums[j];
return subtotal;
},
(x) => Interlocked.Add(ref total, x)
);
Console.WriteLine("The total is {0}", total);
}
我們再看下執行結果:
下面說下泛型方法Parallel.For<T>方法,方法的原型:
TLocal:線程變量的類型;第一個、第二個參數就不必多說了,就是其實值跟結束值。
localInit:每個線程的線程局部變量初始值的設置;
body:每次循環執行的方法,其中方法的最后一個參數就是線程局部變量;
localFinally:每個線程之后執行的方法。
相信這樣解釋后就能明白了為什么需要線程局部變量了,也明白如何使用線程局部變量了。我們再看看在Parallel.Foreach<T>中如何使用:
public void ForeachThreadLocalTest()
{
int[] nums = Enumerable.Range(0, 1000000).ToArray();
long total = 0;
Parallel.ForEach<int,long>(nums,()=>0,(member,loopState,subTotal)=>
{
subTotal += member;
return subTotal;
},
(perLocal)=> Interlocked.Add(ref total,perLocal)
);
Console.WriteLine("The total is {0}", total);
}
要注意的是,我們必須要使用ForEach<TSource, TLocal>,因為第一個參數表示的是迭代源的類型,第二個表示的是線程局部變量的類型,其方法的參數跟For是差不多的。
Break、Stop
首先我們可以看到在Parallel.For的一個重載方法中:
在委托的最后一個參數類型為ParallelLoopState,而ParallelLoopState里面提供給我們兩個方法:Break、Stop來終止迭代,而Break跟Stop的區別是什么呢?我們來看兩段代碼:
{
var Stack = new ConcurrentStack<string>();
Parallel.For(0, 10000, (i, loopState) =>
{
if (i < 1000)
Stack.Push(i.ToString());
else
{
loopState.Stop();
return;
}
});
Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}
private void BreakLoop()
{
var Stack = new ConcurrentStack<string>();
var stringList = this.ConstructString(10000);
Parallel.For(0, stringList.Count, (i, loopState) =>
{
Stack.Push(stringList[i]);
if (stringList[i].Contains("999"))
{
loopState.Break();
}
});
Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}
private List<string> ConstructString(int number)
{
var stringList = new List<string>();
Parallel.For(0, number - 1, i =>
{
stringList.Add(i.ToString());
});
return stringList;
}
測試方法:
public void LoopTest()
{
StopLoop();
BreakLoop();
}
來看看運行結果吧:
其實這個例子只是想告訴大家,為什么第一個用Stop,第二個用Break。原因是:第一個例子中我只關心的是循環的迭代變量i的值,我只要1000個字符串,而不去管這1000個字符串是什么東西。所以當達到1000時,我們就立刻停止所有的迭代包括其他線程上的。而第二個方法中我們是判斷的源中的某個索引值,這個時候有可能較早的元素還未處理。
其實在我們調用過Stop或者Break方法后,循環上的其他的線程可能還會運行一段時間,其實我們可以通過IsStopped屬性來判斷循環是在其他線程上停止。Foreach中的使用就不再看了,跟For是一樣的。
總結
在本文中,主要介紹了如何停止循環、使用線程局部變量。在里面我們看到了我們在使用并行開發時,有很多東西是需要我們去注意的。在下文中將介紹下異常處理、取消循環等話題。