介紹
NSQ是一個實時的分布式消息平臺。它的設計目標是為在多臺計算機上運行的松散服務提供一個現代化的基礎設施骨架。這篇文章介紹了 基于go語言的NSQ的內部架構,它能夠為高吞吐量的網絡服務器帶來 性能的優化,穩定性和魯棒性。可以說, 如果不是因為我們在bitly使用go語言,NSQ就不會存在。這里既會講NSQ的功能也會涉及語言提供的特征。當然,語言會影響思維,這次也不例外。現在回想起來,選擇使用go語言已經收到了十倍的回報。由語言帶來的興奮和社區的積極反饋為這個項目提供了極大的幫助。
概要
NSQ是由3個進程組成的:
-
nsqd是一個接收、排隊、然后轉發消息到客戶端的進程。
-
nsqlookupd 管理拓撲信息并提供最終一致性的發現服務。
-
nsqadmin用于實時查看集群的統計數據(并且執行各種各樣的管理任務)。
NSQ中的數據流模型是由streams和consumers組成的tree。topic是一種獨特的stream。channel是一個訂閱了給定topic的consumers 邏輯分組。
單個nsqd可以有多個topic,每個topic可以有多個channel。channel接收這個topic所有消息的副本,從而實現多播分發,而channel上的每個消息被分發給它的訂閱者,從而實現負載均衡。這些基本成員組成了一個可以表示各種簡單和復雜拓撲結構的強大框架。有關NSQ的設計的更多信息請參見設計文檔。
Topics 和 Channels
Topics 和 channels,是NSQ的核心成員,它們是如何使用go語言的特點來設計系統的最好示例。Go的channels(為防止歧義,以下簡稱為“go-chan”)是表達隊列的一種自然方式,因此一個NSQ的topic/channel,其核心就是一個存放消息指針的go-chan緩沖區。緩沖區的大小由 --mem-queue-size 配置參數確定。
讀取數據后,向topic發布消息的行為包括:
-
實例化消息結構 (并分配消息體的字節數組)
-
read-lock 并獲得 Topic
-
read-lock 并檢查是否可以發布
-
發送到go-chan緩沖區
為了從一個topic和它的channels獲得消息,topic不能按典型的方式用go-chan來接收,因為多個goroutines在一個go-chan上接收將會分發消息,而期望的結果是把每個消息復制到所有channel(goroutine)中。此外,每個topic維護3個主要goroutine。第一個叫做 router,負責從傳入的go-chan中讀取新發布的消息,并存儲到一個隊列里(內存或硬盤)。
第二個,稱為 messagePump, 它負責復制和推送消息到如上所述的channel中。
第三個負責 DiskQueue IO,將在后面討論。
Channels稍微有點復雜,它的根本目的是向外暴露一個單輸入單輸出的go-chan(事實上從抽象的角度來說,消息可能存在內存里或硬盤上);
另外,每一個channel維護2個時間優先級隊列,用于延時和消息超時的處理(并有2個伴隨goroutine來監視它們)。并行化的改善是通過管理每個channel的數據結構來實現,而不是依靠go運行時的全局定時器。
注意:在內部,go運行時使用一個優先級隊列和goroutine來管理定時器。它為整個time包(但不局限于)提供了支持。它通常不需要用戶來管理時間優先級隊列,但一定要記住,它是一個有鎖的數據結構,有可能會影響 GOMAXPROCS>1 的性能。請參閱runtime/time.goc。
Backend / DiskQueue
NSQ的一個設計目標是綁定內存中的消息數目。它是通過DiskQueue(它擁有前面提到的的topic或channel的第三個goroutine)透明的把消息寫入到磁盤上來實現的。
由于內存隊列只是一個go-chan,沒必要先把消息放到內存里,如果可能的話,退回到磁盤上:
1
2
3
4
5
6
7
8
9
10
|
for msg := range c.incomingMsgChan { select { case c.memoryMsgChan <- msg: default : err := WriteMessageToBackend(&msgBuf, msg, c.backend) if err != nil { // ... handle errors ... } } } |
利用go語言的select語句,只需要幾行代碼就可以實現這個功能:上面的default分支只有在memoryMsgChan 滿的情況下才會執行。
NSQ也有臨時channel的概念。臨時channel會丟棄溢出的消息(而不是寫入到磁盤),當沒有客戶訂閱后它就會消失。這是一個Go接口的完美用例。Topics和channels有一個的結構成員被聲明為Backend接口,而不是一個具體的類型。一般的 topics和channels使用DiskQueue,而臨時channel則使用了實現Backend接口的DummyBackendQueue。
減少垃圾回收的壓力
在任何帶有垃圾回收的環境里,你都會多多少少感受到吞吐量(工作有效性)、延遲(響應能力)、駐留集大小(內存使用量)的壓力。就 Go 1.2 而言,垃圾回收有標記-清除(并發的)、不再生、不緊湊、阻止一切運行、大體精準的特點。大體精準是因為剩下的工作沒有及時的完成(這是 Go 1.3 的計劃)。Go 的垃圾回收機制當然會持續改進,但普遍的真理是:創建的垃圾越少,回收垃圾的時間越少。
首先,理解垃圾回收是如何在實際的工作負載中運行的是非常重要的。為此,nsqd 以 statsd 的格式 (與其它內部指標一起) 發布垃圾回收的統計信息。nsqadmin 顯示這些指標的圖表,可以讓你深入了解它在頻率和持續時間兩方面產生的影響:
為了減少垃圾,你需要知道它們是在哪生成的。再次回到Go的工具鏈,它提供的答案如下:
-
使用testing包和go test -benchmen來基準測試熱點代碼路徑。它配置了每個迭代分配的數字(基準的運行可與benchcmp進行比較)。
-
使用 go build -gcflags -m 創建,將會輸出逃逸分析的結果。
除此之外,它還提供了nsqd 的如下優化:
-
避免把[]byte 轉化為字符串類型.
-
預分配切片(特別是make的能力)并總是知曉鏈中各個條目的數量和大小。
-
提供各種配置面板(如消息大小)的限制。
-
避免封裝(如使用interface{})或者不必要的包裝類(例如 用一struct給一個多值的go-chan).
-
在熱代碼路徑(它指定的)中避免使用defer。
TCP 協議
NSQ的TCP協議是一個閃亮的會話典范,在這個會話中垃圾回收優化的理論發揮了極大的效用。
協議的結構是一個有很長的前綴框架,這使得協議更直接,易于編碼和解碼。
1
2
3
4
5
|
[x][x][x][x][x][x][x][x][x][x][x][x]... | (int32) || (int32) || (binary) | 4-byte || 4-byte || N-byte ------------------------------------... size frame ID data |
因為框架的組成部分的確切類型和大小是提前知道的,所以我們可以規避了使用方便的編碼二進制包的Read()和Write()封裝(及它們外部接口的查找和會話)反之我們使用直接調用 binary.BigEndian方法。
為了消除socket 輸入輸出的系統調用,客戶端net.Conn被封裝了bufio.Reader和bufio.Writer。這個Reader通過暴露ReadSlice(),復用了它自己的緩沖區。這樣幾乎消除了讀完socket時的分配,這極大的降低了垃圾回收的壓力。這可能是因為與數據相關的大多數命令并沒有逃逸(在邊緣情況下這是假的,數據被強制復制)。
在更低層,MessageID 被定義為 [16]byte,這樣可以將其作為 map 的 key(slice 無法用作 map 的 key)。然而,考慮到從 socket 讀取的數據被保存為 []byte,勝于通過分配字符串類型的 key 來產生垃圾,并且為了避免從 slice 到 MessageID 的支撐數組產生復制操作,unsafe 包被用來將 slice 直接轉換為 MessageID:
1
|
id := *(*nsq.MessageID)(unsafe.Pointer(&msgID)) |
注意: 這是個技巧。如果編譯器對此已經做了優化,或者 Issue 3512 被打開可能會解決這個問題,那就不需要它了。issue 5376 也值得通讀,它講述了在無須分配和拷貝時,和 string 類型可被接收的地方,可以交換使用的“類常量”的 byte 類型。
類似的,Go 標準庫僅僅在 string 上提供了數值轉換方法。為了避免 string 的分配,nsqd 使用了 慣用的十進制轉換方法,用于對 []byte 直接操作。
這些看起來像是微優化,但 TCP 協議包含了一些最熱的代碼執行路徑。總體來說,以每秒數萬消息的速度來說,它們對分配和系統開銷的數量有著顯著的影響:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
benchmark old ns/op new ns/op delta BenchmarkProtocolV2Data 3575 1963 -45.09% benchmark old ns/op new ns/op delta BenchmarkProtocolV2Sub256 57964 14568 -74.87% BenchmarkProtocolV2Sub512 58212 16193 -72.18% BenchmarkProtocolV2Sub1k 58549 19490 -66.71% BenchmarkProtocolV2Sub2k 63430 27840 -56.11% benchmark old allocs new allocs delta BenchmarkProtocolV2Sub256 56 39 -30.36% BenchmarkProtocolV2Sub512 56 39 -30.36% BenchmarkProtocolV2Sub1k 56 39 -30.36% BenchmarkProtocolV2Sub2k 58 42 -27.59% |
HTTP
NSQ的HTTP API是基于 Go's net/http 包實現的. 就是 常見的HTTP應用,在大多數高級編程語言中都能直接使用而無需額外的三方包。 簡潔就是它最有力的武器,Go的 HTTP tool-chest最強大的就是其調試功能. net/http/pprof 包直接集成了HTTP server,可以方便的訪問CPU, heap, goroutine, and OS 進程文檔 .gotool就能直接實現上述操作:
1
|
$ go tool pprof http: //127 .0.0.1:4151 /debug/pprof/profile |
這對于調試和實時監控進程非常有用!
此外,/stats端端返回JSON或是美觀的文本格式信息,這讓管理員使用命令行實時監控非常容易:
1
|
|
打印出的結果如下:
此外, Go 1.2 還有很多監控指標measurable HTTP performance gains. 每次更新Go版本后都能看到性能方面的改進,真是讓人振奮!
依賴關系
源于其它生態系統,使用GO(理論匱乏)語言的依賴管理還得花點時間去適應
NSQ 就并不是單一的整個 repo庫, 通過 _relative imports_ 而無需區別內部的包資源, 最終產生結構化的依賴管理。
主流的觀點有以下兩個:
-
Vendoring:拷貝應用需要的正確版本號到本地倉庫并修改import 路徑到本地庫地址
-
Virtual Env: 列出構建是需要的版本信息,創建包含相關信息的GOPATH環境變量
Note: 這僅僅應用于二級制包,對于可導入的包版本不起作用
NSQ使用 godep提供 (2) 中的實現.
它的實現原理是復制依賴關系到 Godeps文件中, 之后生成GOPATH環境變量。構建時,它使用Go環境中的工具鏈 來完成工作。 Godeps就是json格式,可以手動修改。
它還支持go的get. 例如,構建一個 NSQ版本:
1
|
$ godep get github.com /bitly/nsq/ ... |
測試
Go語言提供了內置的測試和基線。由于其簡單的并發操作建模,在測試環境里加入nsqd 實例輕而易舉。但是,在測試初始化的時候會有個問題:全局狀態。最明顯的就是引用運行態nsqd 實例的全局變量 i.e.var nsqd *NSQd.
于是某些測試就無可避免的使用局部變量去保存該值i.e.nsqd := NewNSQd(...).這也就意味著全局狀態并未指向運行態的值,使測試失去了意義。
應對這個問題,Context結構體被引入以保存配置項metadata和實時nsqd的父類。所有全局狀態的子引用都通過訪問該Context來安全的獲取相應值(主題,渠道,協議處理等等),這樣測試起來也更有保障。
可靠性
一個系統,如果在面對變幻的網絡環境和不可預知的事件時不具備可靠性,將不會是一個表現良好的分布式生產環境。NSQ的設計和實現方式,使它能容忍錯誤并以一種始終如一的,可預期的和穩定的方式來運行。它的首要的設計哲學是快速失敗,認為錯誤都是致命的,并提供一種方式來調試遇到的任何問題。不過,為了能有所行動,你必須要能夠檢測異常環境...
心跳檢測和超時
NSQ的TCP協議是需要推送的.在經過建立連接,三次握手,客戶在aRDYstate的訂閱數被置為0.當準備接受消息時,通過更新RDYstate來控制將要接受的消息數目。NSQ 客戶端libraries將在后臺持續管理這一環節,最終形成相應的消息流。 周期性的, nsqd 會發送心跳檢測連接狀態.客戶端可以設置這個間隔時間但nsqd需要在發送下調指令前收到上條請求的回復。 應用層面的心跳檢測和RDYstate組合能夠避免 head-of-line blocking,它會是心跳檢測失效 (i.e.如果用戶等待處理消息前OS的緩存已滿,則心跳檢測失效).
為了確保進程的正常工作,所有的網絡IO都會依據心跳檢測的間隔時間來設置邊界.這意味著你甚至可以斷開客戶端和 nsqd 的網絡連接,而不必擔心問題被發現并恰當的處理。一旦發現致命錯誤,客戶連接將被強關。發送中的消息超時并從新加入新的客戶端接受隊列。最后,錯誤日志會被保存并增加內部評價矩陣內容。
管理Goroutines
啟用goroutines很簡單,但后續工作卻不是那么容易弄好的。避免出現死鎖是一個挑戰。通常都是因為在排序上出了問題,goroutine可能在接到上游的消息前就收到了go-chan的退出信號。為啥提到這個?簡單,一個未正確處理的goroutine就是內存泄露。更深入的分析,nsqd 進程含有多個激活的goroutines。從內部情況來看,消息的所有權是不停在變得。為了能正確的關掉goroutines,實時統計所有的進程信息是非常重要的。雖沒有什么神奇的方法,但下面的幾點能讓工作簡單一點...
WaitGroups
sync 包提供了 sync.WaitGroup, 它可以計算出激活態的goroutines數(比提供退出的平均等待時間)
為了使代碼簡潔nsqd 使用如下wrapper:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } // can be used as follows: wg := WaitGroupWrapper{} wg.Wrap(func() { n.idPump() }) // ... wg.Wait() |
退出信號
在含有多個子goroutines中觸發事件最簡單的辦法就是用一個go-chan,并在完成后關閉。所有當中暫停的動作將被激活,這就無需再向每個goroutine發送相關的信號了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } // can be used as follows: wg := WaitGroupWrapper{} wg.Wrap(func() { n.idPump() }) // ... wg.Wait() |
同步退出
想可靠的,無死鎖,所有路徑都保有信息的實現是很難的。下面是一些提示:
-
理想情況下,在go-chan發送消息的goroutine也應為關閉消息負責.
-
如果消息需要保留,確保相關go-chans被清空(尤其是無緩沖的!),以保證發送者可以繼續進程.
-
另外,如果消息不再是相關的,在單個go-chan上的進程應該轉換到包含推出信號的select上 (如上所述)以保證發送者可以繼續進程.
一般的順序應該是:
-
停止接受新的連接(停止監聽)
-
向goroutines發出退出信號(見上文)
-
等待WaitGroup的goroutine中退出(見上文)
-
恢復緩沖數據
-
剩下的部分保存到磁盤
日志
最后,最重要的工作是記錄你的Go例程的入口和出口日志!這使得它更容易識別死鎖或泄漏的情況。nsqd日志行包括信息Go例程與他們的兄弟姐妹(和父母),如客戶端的遠程地址或主題/渠道名。日志是冗長的,但還不至于到接受不了的程度。這個是有兩面性的,但nsqd傾斜當故障發生時向日志中放入更多的信息,,而不是為了避免繁瑣而降低日志定位問題的有效性。
原文地址: http://www.oschina.net/translate/day-22-a-journey-into-nsq
相關文章:
文章列表