NET下RabbitMQ實踐 [WCF發布篇]

作者: 代震軍  來源: 博客園  發布時間: 2010-10-24 22:44  閱讀: 1656 次  推薦: 0   原文鏈接   [收藏]  

  在之前的兩篇文章中,主要介紹了RabbitMQ環境配置,簡單示例的編寫。今天將會介紹如何使用WCF將RabbitMQ列隊以服務的方式進行發布。
  注:因為RabbitMQ的官方.net客戶端中包括了WCF的SAMPLE代碼演示,很適合初學者,所以我就偷了個懶,直接對照它的SAMPLE來說明了,算是借花獻佛吧,呵呵。首先我們下載相應源碼(基于.NET 3.0),本文主要對該源碼包中的代碼進行講解,鏈接如下:   
  Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples
  當然官方還提供了基本.NET 2.0 版本的示例版本,但其中只是一些簡單的示例,并不包括WCF部分,這里只發個鏈接,感興趣的朋友可自行研究。   
  Binary, compiled for .NET 2.0 (zip) - includes example code      
  下載基于.NET 3.0的版本源碼之后,解壓其中的projects\examples\wcf目錄,可看到如下的項目:      幾個文件夾分別對應如下應用場景:
  OneWay: 單向通信(無返回值)
  TwoWay: 雙向通信(請求/響應)
  Session:會話方式
  Duplex: 雙向通信(可以指定一個Callback回調函數)
  OneWay  
  在OneWayTest示例中,演示了插入日志數據,因為日志操作一般只是單純的寫入操作,不考慮返回值,所以使用OneWay方式。下面是其WCF接口聲明和實例代碼,如下:      

    [ServiceContract]
    
public interface ILogServiceContract
    {
        [OperationContract(IsOneWay
=true)]
        
void Log(LogData entry);
    }
   
    [ServiceBehavior(InstanceContextMode 
= InstanceContextMode.Single)]
    
public class LogService : ILogServiceContract
    {
        
public int m_i;
        
public void Log(LogData entry)
        {
            Util.WriteLine(ConsoleColor.Magenta, 
"  [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);
        }
    }

  其只包含一個方法:Log(LogData entry) ---用于添加日志記錄,可以看出它與我們以往寫WCF代碼沒什么兩樣。不過這里要說明一下,在類屬性InstanceContextMode枚舉類型中,使用了“Single”模式,而該枚舉提供了如下三種情況:       
  Single - 為所有客戶端調用分配一個服務實例。
  PerCall – 為每個客戶端調用分配一個服務實例。
  PerSession – 為每個客戶端會話分配一個服務實例。每個Session內多線程操作實例的話會有并發問題。       
  InstanceContextMode 的默認設置為 PerSession
  這三個值通常是要與并發模式(ConcurrencyMode)搭配使用,以解決并發效率,共享資源等復雜場景下的問題的。下面是并發模式的說明:
  ConcurrencyMode 控制一次允許多少個線程進入服務。ConcurrencyMode 可以設置為以下值之一:   
  Single - 一次可以有一個線程進入服務。
  Reentrant - 一次可以有一個線程進入服務,但允許回調。
  Multiple - 一次可以有多個線程進入服務。       
  ConcurrencyMode 的默認設置為 Single。   
  InstanceContextMode 和 ConcurrencyMode 設置會相互影響,因此為了提升并發效能,必須協調這兩項設置。   
  例如,將 InstanceContextMode 設置為 PerCall 時,會忽略 ConcurrencyMode 設置。這是因為,每個客戶端調用都將路由到新的服務實例,因此一次只會有一個線程在服務實例中運行。對于PerCall的實例模型,每個客戶端請求都會與服務端的一個獨立的服務實例進行交互,就不會出現多個客戶端請求爭用一個服務實例的情況,也就不會出現并發沖突,不會影響吞吐量的問題。但對于實例內部的共享變量(static)還是會可能出現沖突。
  但對于當前Single設置,原因很多,可能包括:
  1. 創建服務實例需要大量的處理工作。當多個客戶端訪問服務時,僅允許創建一個服務實例可以降低所需處理量。
  2. 可以降低垃圾回收成本,因為不必為每個調用創建和銷毀服務創建的對象。
  3. 可以在多個客戶端之間共享服務實例。
  4. 避免對static靜態屬性的訪問沖突。   
  但如果使用Single,問題也就出來了---就是性能,因為如果 ConcurrencyMode也同時設置成Single時,當前示例中的(唯一)服務實例不會同時處理多個(單線程客戶端)請求。因為服務在處理請求時會對當前服務加鎖,如果再有其它請求需要該服務處理的時候,需要排隊等候。如果有大量客戶端訪問,這可能會導致較大的瓶頸。
  當然如果考慮到多線程客戶端使用的情況,可能問題會更嚴重。 
  聊了這些,無非就是要結合具體應用場景來靈活搭配ConcurrencyMode,InstanceContextMode這兩個枚舉值。下面言歸正傳,來看一下如何將該服務與RabbitMQ進行綁定,以實現以WCF方式訪問RabbitMQ服務的效果。這里暫且略過LogData數據結構信息類,直接看一下如果綁定服務代碼(位于OneWayTest.cs):   

private ServiceHost m_host;
    

public void StartService(Binding binding)
{

    m_host 
= new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));
    ((RabbitMQBinding)binding).OneWayOnly 
= true;    
    m_host.AddServiceEndpoint(
typeof(ILogServiceContract), binding, "LogService");
    m_host.Open();
    m_serviceStarted 
= true;  
}

  StartService方法的主體與我們平時啟動WCF服務的方法差不多,只不過是將其中的URL協議部分換成了“soap.amqp”形式,而其中的傳入參數binding則是RabbitMQBinding類型,該類型是rabbitmq客戶端類庫提供的用于對應Binding類的RabbitMQBinding實現。下面就是其類實始化代碼:   

     return new RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings["manual-test-broker-uri"],RabbitMQ.Client.Protocols.FromConfiguration("manual-test-broker-protocol"));

    其包括兩個參數,一個是rabbitmq服務地址,一個是所用的協議,其對應示例app.config文件中的如下結點:    

<add key="manual-test-broker-uri" value="amqp://10.0.4.79:5672/"/> <!--本系列第一篇中的環境設置-->
<add key="manual-test-broker-protocol" value="AMQP_0_8"/>

  這樣,我們就完成了初始化服務實例工作。接著來構造客戶端代碼,如下:   

private ChannelFactory<ILogServiceContract> m_factory;

private ILogServiceContract m_client;                          
    
public ILogServiceContract GetClient(Binding binding)
{
    ((RabbitMQBinding)binding).OneWayOnly = true;
    m_factory = new ChannelFactory
<ILogServiceContract>(binding, "soap.amqp:///LogService");
    m_factory.Open();
    return m_factory.CreateChannel();
}

  與平時寫的代碼相似,但傳入參數就是上面提到的那個RabbitMQBinding實例,這樣通過下面代碼訪問WCF中的LOG方法:

    m_client = GetClient(Program.GetBinding());
    m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));
    m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));
    ....

  到這里,我們可以看出,它的實現還是很簡單的。我們只要把10.0.4.79:5672上的rabbitmq的環境跑起來,就可以看出最終的效果了。 之后我將C#的服務端(startservice)與客戶端(getclient)分開布署到不同IP的主機上,也實現了示例中的結果。   
  TwoWay   
  下面介紹一下 TwoWay雙向通信示例,首先是WCF接口聲明和實現:     

    [ServiceContract]
    public interface ICalculator
    {
        [OperationContract]
        int Add(int x, int y);
        
        [OperationContract]
        int Subtract(int x, int y);
    }
    
   [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*為每個客戶端調用分配一個服務實例*/
    public sealed class Calculator : ICalculator
    {
        public int Add(int x, int y)
        {
            return x + y;
        }

        public int Subtract(int x, int y)
        {
            return x - y;
        }
    }

  因為其服務的啟動startservice和客戶端實例構造與oneway方法類似,為了節約篇幅,這時就略過了,下面是其最終調用代碼(位于TwoWayTest.cs):

  public void Run()
 {
     StartService(Program.GetBinding());

     ICalculator calc = GetClient(Program.GetBinding());

     int result = 0, x = 3, y = 4;
     Util.WriteLine(ConsoleColor.Magenta, "  {0} + {1} = {2}", x, y, result = calc.Add(x, y));
     if (result != x + y)
         throw new Exception("Test Failed");

    ......
 }   

  與普通的WCF TWOWAY 返回調用方式相同,就不多說了。   
  Session   
  下面是基于Session會話方式的代碼,WCF接口聲明和實現: 

    [ServiceContract(SessionMode= SessionMode.Required)]
    public interface ICart
    {
        [OperationContract]
        void Add(CartItem item);

        [OperationContract]
        double GetTotal();
        
        Guid Id { [OperationContract]get; }
    }
    
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class Cart : ICart
    {
        public Cart()
        {
            Items = new List
<CartItem>();
            m_id = Guid.NewGuid();
        }
        
        private Guid m_id;
        private List
<CartItem> m_items;

        private List
<CartItem> Items {
            get { return m_items; }
            set { m_items = value; }
        }

        public void Add(CartItem item)
        {
            Items.Add(item);
        }
        
        public double GetTotal()
        {
            double total = 0;
            foreach (CartItem i in Items)
                total += i.Price;

            return total;
        }

        public Guid Id { get { return m_id; } }
    }

  該接口實現一個購物車功能,可以添加商品并計算總價,考慮到并發場景,這里將其實例為PerSession枚舉類型,即為每個客戶端會話分配一個服務實例。這樣就可以在用戶點擊購買一件商品里,為其購物車商品列表List<CartItem>添加一條信息,而不會與其它用戶的購物車商品列表相沖突。其最終的調用方法如下:   

public void Run()
{
    StartService(Program.GetBinding());

    ICart cart = GetClient(Program.GetBinding());

    AddToCart(cart, "Beans", 0.49);//添加商品到購物車
    AddToCart(cart, "Bread", 0.89);
    AddToCart(cart, "Toaster", 4.99);

    double total = cart.GetTotal();//計算總價
    if (total != (0.49 + 0.89 + 4.99))
        throw new Exception("Incorrect Total");
    ......
}

  Duplex   
  最后,再介紹一下如何基于Duplex雙向通信模式進行開發,DuplexTest這是個“PIZZA訂單”的場景,用戶下單之后,等待服務端將PIZZA加工完畢,然后服務端用callback方法通知客戶端PIZZA已做好,相應WCF接口聲明和實現如下: 

   [ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*綁定回調接口*/
    public interface IPizzaService
    {
        [OperationContract(IsOneWay=true)]
        void PlaceOrder(Order order);
    }

    [ServiceContract]
    public interface IPizzaCallback
    {
        [OperationContract(IsOneWay=true)]
        void OrderReady(Guid id); /*用于通知客戶端*/
    }    
    
    public class PizzaService : IPizzaService
    {
        public void PlaceOrder(Order order)
        {
            foreach (Pizza p in order.Items)
            {
                Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);
            }
            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Order {0} is Ready!", order.Id);

            Callback.OrderReady(order.Id);
        }

        IPizzaCallback Callback
        {
            get { return OperationContext.Current.GetCallbackChannel
<IPizzaCallback>(); } //當前上下文中調用客戶端綁定的回調方法
        }
    }

  這里要說明的是IPizzaCallback接口的OrderReady方法被綁定了IsOneWay=true屬性,主要是因為如果使用“請求-響應”模式,客戶端必須等服務端“響應”完成上一次“請求”后才能發出下一步“請求”。因此雖然客戶端可以使用多線程方式來調用服務,但最后的執行結果仍然表現出順序處理(效率低)。要想使服務端能夠并行處理客戶端請求的話,那我們就不能使用“請求-響應”的調用模式,所以這里使用One-Way的方式來調用服務。
  下面是客戶端回調接口實現:      

    public class PizzaClient : DuplexClientBase<IPizzaService>, IPizzaService
    {
        public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)
            : base(context, binding, remoteAddress) { }

        public void PlaceOrder(Order order)
        {
            Channel.PlaceOrder(order);
        }
    }

  最終客戶端實例化(startservice)略過,因與之前示例類似。   

    public IPizzaService GetClient(Binding binding)
    {
        PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));
        client.Open();
        return client;
    }

  上面的方法中將當前客戶端實例this(實現了IServiceTest<IPizzaService>, IPizzaCallback接口)注冊到上下文中,目的是為了將其方法的回傳調用傳遞到服務端(還記得服務端的這行代碼嗎?=>Callback.OrderReady(order.Id))

public void OrderReady(Guid id)
{
    Util.WriteLine(ConsoleColor.Magenta, "  [CLI] Order {0} has been delivered.",id);
    mre.Set();
}

  這樣,服務端完成pizza時,就可以調用客戶端的OrderReady方法來實現通知功能了。下面就是一個整個的下單流程實現:

public void Run()
{
       ......
       StartService(Program.GetBinding());

       IPizzaService client = GetClient(Program.GetBinding());
       Order lunch = new Order();
       lunch.Items = new List
<Pizza>();
       lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));
       client.PlaceOrder(lunch);
       ......
}

  好了,今天的主要內容就先到這里了,在接下來的文章中,將會介紹一個rabbitmq的實際應用場景,也是我們Discuz!NT企業版中的一個功能:記錄系統運行的錯誤日志。     

0
0
 
標簽:.NET RabbitMQ
 
 

文章列表

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()