用 C# 簡單模擬 Google Go 語言中的 Channel 和 goroutine 機制
前段時間嘗試了一點 Google 的 Go 語言,感覺其很多特性還是不錯的。Go 語言旨在結合傳統編譯型的靜態語言和解釋型的動態語言的優點,在其中找到一個平衡。從而打造一個既快速(編譯執行),又方便編程的語言(動態語言往往語法簡單快捷)。同時,Go 語言還具備豐富的特性以支持并發編程,這在現在多核非常普及的情況下,是很重要和強大的一個功能。
Go 語言的并發特性主要有 goroutine, channel 等。goroutine - 可以大致理解為一種輕量級的線程(或微線程),它是一種“分配在同一個地址空間內的,能夠并行執行的函數”。同時,它是輕量級的,不需要像分配線程那樣分配獨立的棧空間。所以理論上講,我們可以很容易的分配很多個 goroutine, 讓它們并發執行,而其開銷則比多線程程序要小得多,從而可以讓程序支持比較大的并發性。channel - 顧名思義,就是通道。通道的目的是用來傳遞數據。在一個通道上我們可以執行數據的發送(Send)和接受(Receive)操作。對于非緩沖的 channel 而言,Receive 方法執行時,會判斷該通道上是否有值,如果沒有就會等待(阻塞),直到有一個值為止。同樣,在 channel 上有值,而尚未被一個 Receiver 接受的時候,Send 方法也會阻塞,直到 Channel 變空。這樣,通過一個簡單的機制就可以保證 Send 和 Receive 總是在不同的時間執行的,而且只有 Send 之后才能 Receive. 這樣就避免了常規的多線程編程中數據共享的問題。正如 Go 語言的文檔一句話所說:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享內存來溝通;而是通過溝通來共享內存。
在常規的多線程編程里,我們總是定義好一些類變量,如果這些變量有可能被多個線程同時訪問,那么就需要加鎖。這樣帶來了一定的編程復雜性,如果代碼寫的稍有bug,則會導致讀/寫到錯誤的值。而通過 channel 來溝通,我們得到了一個更為清晰的溝通方式。兩個線程(或者 goroutine)要讀寫相同的數據,則創建一個通道,雙方通過對這個通道執行 Send / Receive 的操作來設值或取值即可,相對而言,比較不容易出錯。
為了更好的理解這個原理,我嘗試了在 C# 中實現類似的功能。相對于 goroutine, 我沒有去實現微線程,因為這需要更復雜的調度機制(打算接下來進一步研究這方面)。我們可以暫時利用 Thread 來簡單的模擬之。而 Channel, 則用 Semaphone 控制同步的 Send / Receive 就可以了。
首先讓我們來實現一個簡單的 Channel,思想上面已經說過了:
/// 先實現簡單的沒有緩沖的 Channel.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Channel<T>
{
T _value;
// 開始不能 Receive.
Semaphore _canReceive = new Semaphore(0, 1);
// 開始沒有值,可以 Send
Semaphore _canSend = new Semaphore(1, 1);
public T Receive()
{
// 等待有值
_canReceive.WaitOne();
T value = _value;
// 通知可以發送新的值了
_canSend.Release();
return value;
}
public void Send(T value)
{
// 如果是非緩沖的情況,則為阻塞式的,需要等待已有的值被一個 Receiver 接受完,
// 才能發送新值,不能連續 Send
_canSend.WaitOne();
_value = value;
// 通知可以接收了
_canReceive.Release();
}
}
接下來粗略的模擬實現 goroutine 的語法:
{
/// <summary>
/// 先簡單的用線程來模擬 goroutine. 因為使用 channel 通信,所以
/// 不需考慮線程之間的數據共享/同步問題
/// </summary>
/// <param name="action"></param>
public static void go(Action action)
{
new Thread(new ThreadStart(action)).Start();
}
}
有了這些,我們可以寫一個 test case 來驗證了。下面的代碼簡單的創建一個并發的 routine,分別做整數的 send, receive 操作,以驗證是否能正確的發送和接受值:
/// 測試多個 Sender 多個 Receiver 同時在一個 channel 上發送/接受消息
/// </summary>
private static void Test1()
{
var ch = new Channel<int>();
// 啟動多個 Sender
GoLang.go(() =>
{
var id = Thread.CurrentThread.ManagedThreadId;
for (var i = 0; i < 7; i++)
{
Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(3000));
Console.WriteLine("線程{0}發送值: {1}", id, i);
ch.Send(i);
}
});
GoLang.go(() =>
{
var id = Thread.CurrentThread.ManagedThreadId;
for (var i = 7; i < 15; i++)
{
Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(3000));
Console.WriteLine("線程{0}發送值: {1}", id, i);
ch.Send(i);
}
});
// 啟動多個 Receiver
GoLang.go(() =>
{
var id = Thread.CurrentThread.ManagedThreadId;
for (var i = 0; i < 5; i++)
{
//Console.WriteLine("線程{0}阻塞", id);
var value = ch.Receive();
Console.WriteLine("線程{0}獲得值: {1}", id, value);
}
});
GoLang.go(() =>
{
var id = Thread.CurrentThread.ManagedThreadId;
for (var i = 0; i < 5; i++)
{
//Console.WriteLine("線程{0}阻塞", id);
var value = ch.Receive();
Console.WriteLine("線程{0}獲得值: {1}", id, value);
}
});
GoLang.go(() =>
{
var id = Thread.CurrentThread.ManagedThreadId;
for (var i = 0; i < 5; i++)
{
//Console.WriteLine("線程{0}阻塞", id);
var value = ch.Receive();
Console.WriteLine("線程{0}獲得值: {1}", id, value);
}
});
}
再嘗試實現一下 Go 語言文檔里舉出的一個例子 - 篩法求素數:
(見:http://golang.org/doc/go_tutorial.html, Prime numbers)
{
public void Main()
{
var primes = Sieve();
// 測試:打印前100個素數
for (var i = 0; i < 100; i++)
{
Console.WriteLine(primes.Receive());
}
}
/// <summary>
/// 篩法求素數
/// </summary>
/// <returns></returns>
Channel<int> Sieve()
{
var @out = new Channel<int>();
GoLang.go(() =>
{
var ch = Generate();
for (; ; )
{
// 當前序列中的第一個值總是素數
var prime = ch.Receive();
// 將其發送到輸出序列的尾部
@out.Send(prime);
// 用這個素數對列表進行過濾,在進入下一次循環,可以保證至少第一個數是素數
ch = Filter(ch, prime);
}
});
return @out;
}
/// <summary>
/// 產生從2開始的自然數的無窮序列,這是原始數列
/// 其開始元素 2 是一個素數。
/// </summary>
/// <returns></returns>
Channel<int> Generate()
{
var ch = new Channel<int>();
GoLang.go(() =>
{
for (var i = 2; ; i++)
{
ch.Send(i);
}
});
return ch;
}
/// <summary>
/// 從輸入 channel 里逐個讀取值,將不能被 prime 整除
/// 的那些發送到輸出 channel (即用 prime 對 @in 序列進行一次篩選)
/// </summary>
Channel<int> Filter(Channel<int> @in, int prime)
{
var @out = new Channel<int>();
GoLang.go(() =>
{
for (; ; )
{
var i = @in.Receive();
if (i % prime != 0)
{
@out.Send(i);
}
}
});
return @out;
}
}
下面是整個測試工程的 Main 方法:
{
static void Main(string[] args)
{
Test1();
new PrimeNumbers().Main();
Console.ReadLine();
}
}
因為代碼中已經詳細注釋了,不多做解釋。可以看到,利用 Channel 的概念(好像和 Reactive Programming 有點關系?),我們可以更清晰的構建多線程或者并發的應用程序。
學習其他語言,并不是為了學習其特定的語法,而是學習一種思想。