前言
這篇文章主要介紹整個框架用到的最核的一個設計模式:反應器模式。這個設計模式可以在《面向對象的軟件架構》中詳細了解,沒有這本書的小伙伴不要急,我通過咱們的SimpleRpc來告訴大家這個設計模式是如何運用的。之所以它叫反應器模式,是因為它是處理事件的一種比較優美的框架。如何優美,我們慢慢道來。
如何設計一個高吞吐量的web服務?
web服務會面對大量的網絡請求,服務要對這些請求進行處理,如何設計我們的web服務呢?有如下幾種模型供參考。
- 單線程模型 系統中使用唯一的一個線程處理網絡數據的讀,對請求數據的處理,以及發送響應。當一個網絡請求到來時,工作線程通過accept獲取到活動socket,接下來該線程順序的讀取數據、處理數據、生成響應數據、寫回socket。這個模型有一個明顯的缺點:當服務處理一個請求時,其它請求將阻塞得不到響應。它難以勝任高并發大吞吐量的web服務。
- 多線程模型 每當accept返回一個新的活動socket后,主線程就創建一個新線程進行數據的讀、處理、響應。這樣做的好處就是當一個線程處理請求時,其它線程可以接收新的請求并進行響應。它的缺點也很明顯:當請求的并發量多的時候,系統會同時有多個線程工作。當線程非常多時,cpu需要在多個線程之間做切換,切換的開銷將會增大,不是一個伸縮性很好的設計。我們可以使用線程池來優化這個設計,但是這么做也有一個問題:如果某些客戶端與服務端建立連接后并不是馬上發送數據,那么此時服務端會有大量的線程就都hang在socket的讀上面(因為網絡數據遲遲不發送),cpu使用率不高。
如何克服上面兩種模型的弊端呢?我們的反應器模式開始大展身手。
反應器模式
- Select/Epoll簡介: 上面兩種模型沒有用到操作系統提供的更高級的網絡數據處理機制:select模型/Epoll模型。這是一個能夠同時監聽多個socket句柄上的動作的機制,由操作系統支持。它維護一個監聽列表,用戶可以動態添加和刪除需要監聽的socket句柄。如果一個句柄被加入了它的監聽列表,當句柄有新數據到來或者句柄可寫時就會產生一個電位差提醒操作系統有socket句柄可以操作(讀或者寫),這時select/Epoll就會告訴用戶可以在哪些socket句柄上做操作。它的優點是不會因為其中任何一個句柄的阻塞而忽略其它句柄上的可操作事件。
有了Select/Epoll這個利器,我們就可以設計反應器了:
- Reactor(反應器)接口定義:
regist:在一個socket句柄上注冊操作函數。
remove:從反應器中移除對某個句柄的監聽。
handle_events:通過系統提供的select/epoll_wait獲取所監聽句柄上的事件通知,并調用對應句柄所注冊的函數進行處理。
- EventHandler接口定義:
handle_read: 對句柄上的讀事件進行操作
handle_write: 對句柄上的寫事件進行操作
SimpleRpc中的核心框架實現
Reactor使用Epoll實現,Epoll的具體使用方式這里就不贅述了,因為它不是本文的主要寫作目的。這里面講述一下SimpleRpc網絡事件處理的最核心的三個類:Acceptor,UpstreamHandler,DownstreamHandler。
- Acceptor接受器:
Acceptor::Acceptor(const InetAddr &addr, Reactor *reactor){ _reactor = reactor; int sock_fd = socket(AF_INET, SOCK_STREAM, 0); sockaddr sock_addr = addr.addr(); int ret = bind(sock_fd, &sock_addr, sizeof(sockaddr)); if(ret != 0){ LOG("bind error"); exit(0); } ret = listen(sock_fd, 1000); if(ret != 0) { LOG("listen error"); exit(0); } _reactor->regist(sock_fd, this); }
首先,服務端使用socket函數創建一個socket_fd,綁定好IP和端口后,acceptor把這個socket_fd加入到reactor監聽列表中,當有客戶端主動發起與該服務的連接時,服務端該socket_fd上會有讀事件產生,Acceptor的handle_read函數將會被調用。
void Acceptor::handle_read(int sock_fd) { struct sockaddr addr; socklen_t size = sizeof(struct sockaddr_in); int fd = accept(sock_fd, &addr, &size); _reactor->regist(fd, new UpstreamHandler(fd, _reactor)); //UpstreamHandler用來處理客戶端請求。 }
handle_read函數接收到這個socket_fd后,知道這是客戶端的請求連接,于是調用accept函數獲取這個新連接的socket句柄fd。站在服務端角度看,客戶端就是它的上游,對于上游事件的處理需要用UpstreamHandler。Acceptor在reactor上綁定fd與UpstreamHandler,reactor等待后續的事件的到來。Acceptor就像一只老母雞,不斷的下蛋(蛋就是新生產出的fd),并把蛋放入到reactor等待進一步孵化。
- UpstreamHandler:上游請求事件處理
void UpstreamHandler::handle_read(int fd) { if(fd != _sock_fd){ return; } StreamEvent e; e.fd = _sock_fd;
e.type = 0; //客戶端請求事件 _reactor->remove(fd); //移除fd //由每個工作線程自己去讀fd,客戶端請求事件 ThreadPool<UpstreamEvent>::get_instance()->put_event(e); //放入隊列 //自殺 delete this; }
客戶端請求服務端建立連接后,第一步就是要向服務端發送請求數據。客戶端發送數據后,服務端reactor發現socket句柄上有讀事件,就調用對應句柄上的事件處理函數(UpstreamHandler::handle_read())。該事件處理函數并不直接進行數據的讀取和計算,而是先從reactor中移除該fd(防止后續數據到來,reactor重復獲取讀事件),之后把fd封裝成一個事件結構體放入阻塞隊列中,由共享該阻塞隊列的線程池進行后續處理。
- DownstreamHandler:下游事件處理
void DownstreamHandler::handle_read(int fd) { char head[4]; if(fd != _sock_fd){ return; } _reactor->remove(fd); Connection conn(fd); conn.recv_n(head, 4); int size = *((int *)head); char *buf = new char[size]; conn.recv_n(buf, size); close(fd); printf("Downstream Handler close fd:%d\n", fd); //下游響應 _response->deserialize(buf, size); if(_result_handler != NULL) { _result_handler->data_comeback(); } delete[] buf; //自殺 delete this; }
站在客戶端的角度來看,服務端就是其下游,客戶端對服務端的響應數據的處理需要使用DownstreamHandler。當服務端返回響應數據時,客戶端的reactor會檢測到對應socket句柄上的讀事件,隨后調用對應事件處理函數(handle_read)。該函數首先從reactor移除對該fd的監聽,防止reactor重復檢測事件并調用處理函數;之后就接收數據、反序列化得到響應結構體。
文章列表
留言列表