前言
本篇文章講述客戶端與服務端的具體設計細節。有細心的小伙伴發現,客戶端和服務端的工作方式不一樣:服務端是多線程計算模型,利用工作線程完成數據的讀取,而客戶端是單線程(利用Reactor線程完成數據的讀取)。這么做的原因有二:首先我們認為我們的使用RPC的初衷是由于CPU計算是瓶頸,不得已把計算放到多臺機器上,所以服務端采用多線程計算模型;其次我們認為網絡IO只要不是客戶端故意阻塞,那么無論是請求數據還是響應數據只需要一次接收就可以收全,不會有線程長時間阻塞在網絡上,所以客戶端就使用反應器線程進行接收響應數據。
客戶端同步和異步調用
SimpleRpc提供了同步調用和異步調用的方法,使用區別在于傳遞的參數不同,如下所示。
//異步請求 int async_request(Server &server, Request *request, Response *response, ResultHandler *handler); //同步請求 int sync_request(Server &server, Request *request, Response *response);
那么SimpleRpc對于同步和異步調用是如何支持的呢?我們重新看一下DownstreamHandler對數據的處理方式:
void DownstreamHandler::handle_read(int fd) { char head[4]; Connection conn(fd); conn.recv_n(head, 4); int size = *((int *)head); char *buf = new char[size]; conn.recv_n(buf, size); close(fd); printf("Downstream Handler close fd:%d\n", fd); //下游響應 _response->deserialize(buf, size); //如果有result_handler,則調用data_comeback鉤子函數 if(_result_handler != NULL) { _result_handler->data_comeback(); //對于同步調用,這個方法會喚醒客戶端使其從wait中返回 } delete[] buf; //自殺 delete this; }
result_handler的調用是關鍵,我們正是利用這一點做到同步調用和異步調用。ResultHandler的類UML如下:
DefaultResultHandler是SimpleRpc的默認結果處理方式,UserDefinedResultHandler由用戶自己選擇性的定義并實現。當客戶端工作線程對服務端相應數據處理完畢后,調用ResultHandler的data_comeback方法執行這個鉤子函數。
- 同步調用的實現:
int SimpleRpcClient::sync_request(Server &server, Request *request, Response *response) { Mutex mutex; Connection conn; Condition cond(&mutex); InetAddr addr(server.get_port_str(), server.get_ip_str()); Connector conntor(addr); int ret = conntor.Connect(conn); //建立與服務端的連接 if(ret == -1){ LOG("connect error\n"); return -1; } int size = request->bytes(); //獲取請求序列化后的字節數 char *buf = new char[size + 4]; //用額外4字節存放數據長度,方便接收端校驗 if(buf == NULL) { LOG("request oom, request need %d bytes\n", size + 4); conn.Close(); return -1; } int payload = request->serialize(buf + 4, size); //序列化 memcpy(buf, &payload, sizeof(int)); ret = conn.send_n(buf, payload + 4); //發送序列化數據 if(ret != 0) { LOG("connection send error\n"); return -1; } DefaultResultHandler *handler = new DefaultResultHandler(&cond, &mutex); DownstreamHandler *down_handler = new DownstreamHandler(conn.sock(), response, Reactor::get_instance(), handler); Reactor::get_instance()->regist(conn.sock(), down_handler); //注冊到reactor中等待響應事件的通知 handler->finish(); //阻塞調用,直到cond得到喚醒通知 delete[] buf; delete handler; return 0; }
我們的DefautlResultHandler擁有系統等待條件(Condition),并且作為DownstreamHandler的成員之一。客戶端發送請求數據后,構造DownstreamHandler并注冊到reactor中,等待服務端響應事件的通知。干完以上的事情之后,客戶端應用線程調用DefaultResultHandler的finish方法阻塞直到得到完成通知,這樣達到了同步調用的效果。
- 異步調用的實現:
異步調用沒有使用DefaultResultHandler作為參數傳遞給DownstreamHandler,而是把用戶自定義的ResultHanlder傳遞進去,具體的控制流程(data_comeback函數)由用戶自己定義。
int SimpleRpcClient::async_request( Server &server, Request *request, Response *response, ResultHandler *handler) { ... DownstreamHandler *down_handler = new DownstreamHandler(conn.sock(), response, Reactor::get_instance(), handler); Reactor::get_instance()->regist(conn.sock(), down_handler); ... }
服務端工作線程計算模型
我們知道服務端使用多線程進行數據的處理,那么每個線程的工作內容是什么呢?
template<class REQUEST, class RESPONSE> class Processor : public Worker<StreamEvent> { public: virtual int process(REQUEST &request, RESPONSE &response) = 0; void run() { while(true){ StreamEvent e = get_event(); //隊列中獲取待處理事件 char head[4]; Connection conn(e.fd); int payload = conn.recv_n(head, 4); //接收數據長度 if(payload == -1) { close(e.fd); printf("Error Processor close fd:%d\n", e.fd); return; } REQUEST request; RESPONSE response; int size = *((int *)head); char *recv_buf = new char[size]; conn.recv_n(recv_buf, size); //接收請求數據 request.deserialize(recv_buf, size); //反序列化 process(request, response); //進行用戶代碼邏輯計算,由用戶實現 size = response.bytes(); char *send_buf = new char[size + 4]; memcpy(send_buf, &size, sizeof(int)); payload = response.serialize(send_buf + 4, size); //序列化響應數據 conn.send_n(send_buf, size + 4); //發送響應數據 //為了正常關閉該鏈接,需要重新注冊回reactor UpstreamHandler *upHandler = new UpstreamHandler(e.fd, Reactor::get_instance()); Reactor::get_instance()->regist(e.fd, upHandler); delete recv_buf; delete send_buf; } }
virutal ~Processor(){}
}
文章列表