文章出處

前言


本篇文章講述客戶端與服務端的具體設計細節。有細心的小伙伴發現,客戶端和服務端的工作方式不一樣:服務端是多線程計算模型,利用工作線程完成數據的讀取,而客戶端是單線程(利用Reactor線程完成數據的讀取)。這么做的原因有二:首先我們認為我們的使用RPC的初衷是由于CPU計算是瓶頸,不得已把計算放到多臺機器上,所以服務端采用多線程計算模型;其次我們認為網絡IO只要不是客戶端故意阻塞,那么無論是請求數據還是響應數據只需要一次接收就可以收全,不會有線程長時間阻塞在網絡上,所以客戶端就使用反應器線程進行接收響應數據。

客戶端同步和異步調用


SimpleRpc提供了同步調用和異步調用的方法,使用區別在于傳遞的參數不同,如下所示。

    //異步請求
    int async_request(Server &server, Request *request, Response *response, ResultHandler *handler);                 
    //同步請求
    int sync_request(Server &server, Request *request, Response *response);

那么SimpleRpc對于同步和異步調用是如何支持的呢?我們重新看一下DownstreamHandler對數據的處理方式:

void DownstreamHandler::handle_read(int fd) {                                                                   
  char head[4];
  Connection conn(fd);
  conn.recv_n(head, 4);
  int size = *((int *)head);
  char *buf = new char[size];                                                                                        
  conn.recv_n(buf, size);                                                                                            
  close(fd);  
  printf("Downstream Handler close fd:%d\n", fd);                                                                    
  //下游響應
  _response->deserialize(buf, size);                                                                                 
  //如果有result_handler,則調用data_comeback鉤子函數
  if(_result_handler != NULL) {
    _result_handler->data_comeback();         //對于同步調用,這個方法會喚醒客戶端使其從wait中返回                                                                       
  }                                                                                                                  
  
  delete[] buf;                                                                                                      
  //自殺 
  delete this;                                                                                                       
}

result_handler的調用是關鍵,我們正是利用這一點做到同步調用和異步調用。ResultHandler的類UML如下:

DefaultResultHandler是SimpleRpc的默認結果處理方式,UserDefinedResultHandler由用戶自己選擇性的定義并實現。當客戶端工作線程對服務端相應數據處理完畢后,調用ResultHandler的data_comeback方法執行這個鉤子函數。

  • 同步調用的實現:
int SimpleRpcClient::sync_request(Server &server, Request *request, Response *response) {
  Mutex mutex;
  Connection conn;
  Condition cond(&mutex);
  InetAddr addr(server.get_port_str(), server.get_ip_str());  
  Connector conntor(addr);
  int ret = conntor.Connect(conn);  //建立與服務端的連接
  if(ret == -1){
    LOG("connect error\n");
    return -1;
  }
  int size = request->bytes();      //獲取請求序列化后的字節數
  char *buf = new char[size + 4];   //用額外4字節存放數據長度,方便接收端校驗
  if(buf == NULL) {
    LOG("request oom, request need %d bytes\n", size + 4);
    conn.Close();
    return -1;
  }
  int payload = request->serialize(buf + 4, size);  //序列化
  memcpy(buf, &payload, sizeof(int));
  ret = conn.send_n(buf, payload + 4);  //發送序列化數據
  if(ret != 0) {
    LOG("connection send error\n");
    return -1;
  }
  DefaultResultHandler *handler = new DefaultResultHandler(&cond, &mutex);
  DownstreamHandler *down_handler =
    new DownstreamHandler(conn.sock(), response, Reactor::get_instance(), handler);
  Reactor::get_instance()->regist(conn.sock(), down_handler);  //注冊到reactor中等待響應事件的通知
  handler->finish();        //阻塞調用,直到cond得到喚醒通知
  delete[] buf;
  delete handler;
  return 0;
}

我們的DefautlResultHandler擁有系統等待條件(Condition),并且作為DownstreamHandler的成員之一。客戶端發送請求數據后,構造DownstreamHandler并注冊到reactor中,等待服務端響應事件的通知。干完以上的事情之后,客戶端應用線程調用DefaultResultHandler的finish方法阻塞直到得到完成通知,這樣達到了同步調用的效果。

  • 異步調用的實現:

異步調用沒有使用DefaultResultHandler作為參數傳遞給DownstreamHandler,而是把用戶自定義的ResultHanlder傳遞進去,具體的控制流程(data_comeback函數)由用戶自己定義。

int SimpleRpcClient::async_request(
  Server &server, Request *request, Response *response, ResultHandler *handler) {
  ...  
  DownstreamHandler *down_handler =
    new DownstreamHandler(conn.sock(), response, Reactor::get_instance(), handler);
  Reactor::get_instance()->regist(conn.sock(), down_handler);
  ...
}

 服務端工作線程計算模型


我們知道服務端使用多線程進行數據的處理,那么每個線程的工作內容是什么呢?

template<class REQUEST, class RESPONSE>                                                                                       
class Processor : public Worker<StreamEvent> {                                                                                
  public:                                                                                                                     
    virtual int process(REQUEST &request, RESPONSE &response) = 0;                                                            
                                                                                                                              
    void run() {                                                                                                              
      while(true){                                                                                                            
        StreamEvent e = get_event();      //隊列中獲取待處理事件                                                                                    
        char head[4];                                                                                                       
        Connection conn(e.fd);                                                                                              
        int payload = conn.recv_n(head, 4);     //接收數據長度                                                                            
        if(payload == -1) {                                                                                                 
          close(e.fd);                                                                                                      
          printf("Error Processor close fd:%d\n", e.fd);                                                                    
          return;                                                                                                           
        }                                                                                                                   
                                                                                                                              
        REQUEST request;                                                                                                    
        RESPONSE response;                                                                                                  
                                                                                                                              
        int size = *((int *)head);                                                                                          
        char *recv_buf = new char[size];                                                                                    
        conn.recv_n(recv_buf, size);              //接收請求數據                                                                         
        request.deserialize(recv_buf, size);      //反序列化                                                                          
                                                                                                                              
        process(request, response);               //進行用戶代碼邏輯計算,由用戶實現                                                                            
        size = response.bytes();                                                                                            
        char *send_buf = new char[size + 4];                                                                                
        memcpy(send_buf, &size, sizeof(int));                                                                               
        payload = response.serialize(send_buf + 4, size);      //序列化響應數據                                                             
        conn.send_n(send_buf, size + 4);                       //發送響應數據                                                                 
        //為了正常關閉該鏈接,需要重新注冊回reactor                                                                         
        UpstreamHandler *upHandler = new UpstreamHandler(e.fd, Reactor::get_instance());
        Reactor::get_instance()->regist(e.fd, upHandler);
        delete recv_buf;
        delete send_buf;
      }
    }
virutal ~Processor(){}
}

 


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()