NET下RabbitMQ實踐 [WCF發布篇]
在之前的兩篇文章中,主要介紹了RabbitMQ環境配置,簡單示例的編寫。今天將會介紹如何使用WCF將RabbitMQ列隊以服務的方式進行發布。
注:因為RabbitMQ的官方.net客戶端中包括了WCF的SAMPLE代碼演示,很適合初學者,所以我就偷了個懶,直接對照它的SAMPLE來說明了,算是借花獻佛吧,呵呵。首先我們下載相應源碼(基于.NET 3.0),本文主要對該源碼包中的代碼進行講解,鏈接如下:
Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples
當然官方還提供了基本.NET 2.0 版本的示例版本,但其中只是一些簡單的示例,并不包括WCF部分,這里只發個鏈接,感興趣的朋友可自行研究。
Binary, compiled for .NET 2.0 (zip) - includes example code
下載基于.NET 3.0的版本源碼之后,解壓其中的projects\examples\wcf目錄,可看到如下的項目: 幾個文件夾分別對應如下應用場景:
OneWay: 單向通信(無返回值)
TwoWay: 雙向通信(請求/響應)
Session:會話方式
Duplex: 雙向通信(可以指定一個Callback回調函數)
OneWay
在OneWayTest示例中,演示了插入日志數據,因為日志操作一般只是單純的寫入操作,不考慮返回值,所以使用OneWay方式。下面是其WCF接口聲明和實例代碼,如下:

public interface ILogServiceContract
{
[OperationContract(IsOneWay=true)]
void Log(LogData entry);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public class LogService : ILogServiceContract
{
public int m_i;
public void Log(LogData entry)
{
Util.WriteLine(ConsoleColor.Magenta, " [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);
}
}
其只包含一個方法:Log(LogData entry) ---用于添加日志記錄,可以看出它與我們以往寫WCF代碼沒什么兩樣。不過這里要說明一下,在類屬性InstanceContextMode枚舉類型中,使用了“Single”模式,而該枚舉提供了如下三種情況:
Single - 為所有客戶端調用分配一個服務實例。
PerCall – 為每個客戶端調用分配一個服務實例。
PerSession – 為每個客戶端會話分配一個服務實例。每個Session內多線程操作實例的話會有并發問題。
InstanceContextMode 的默認設置為 PerSession。
這三個值通常是要與并發模式(ConcurrencyMode)搭配使用,以解決并發效率,共享資源等復雜場景下的問題的。下面是并發模式的說明:
ConcurrencyMode 控制一次允許多少個線程進入服務。ConcurrencyMode 可以設置為以下值之一:
Single - 一次可以有一個線程進入服務。
Reentrant - 一次可以有一個線程進入服務,但允許回調。
Multiple - 一次可以有多個線程進入服務。
ConcurrencyMode 的默認設置為 Single。
InstanceContextMode 和 ConcurrencyMode 設置會相互影響,因此為了提升并發效能,必須協調這兩項設置。
例如,將 InstanceContextMode 設置為 PerCall 時,會忽略 ConcurrencyMode 設置。這是因為,每個客戶端調用都將路由到新的服務實例,因此一次只會有一個線程在服務實例中運行。對于PerCall的實例模型,每個客戶端請求都會與服務端的一個獨立的服務實例進行交互,就不會出現多個客戶端請求爭用一個服務實例的情況,也就不會出現并發沖突,不會影響吞吐量的問題。但對于實例內部的共享變量(static)還是會可能出現沖突。
但對于當前Single設置,原因很多,可能包括:
1. 創建服務實例需要大量的處理工作。當多個客戶端訪問服務時,僅允許創建一個服務實例可以降低所需處理量。
2. 可以降低垃圾回收成本,因為不必為每個調用創建和銷毀服務創建的對象。
3. 可以在多個客戶端之間共享服務實例。
4. 避免對static靜態屬性的訪問沖突。
但如果使用Single,問題也就出來了---就是性能,因為如果 ConcurrencyMode也同時設置成Single時,當前示例中的(唯一)服務實例不會同時處理多個(單線程客戶端)請求。因為服務在處理請求時會對當前服務加鎖,如果再有其它請求需要該服務處理的時候,需要排隊等候。如果有大量客戶端訪問,這可能會導致較大的瓶頸。
當然如果考慮到多線程客戶端使用的情況,可能問題會更嚴重。
聊了這些,無非就是要結合具體應用場景來靈活搭配ConcurrencyMode,InstanceContextMode這兩個枚舉值。下面言歸正傳,來看一下如何將該服務與RabbitMQ進行綁定,以實現以WCF方式訪問RabbitMQ服務的效果。這里暫且略過LogData數據結構信息類,直接看一下如果綁定服務代碼(位于OneWayTest.cs):

public void StartService(Binding binding)
{
m_host = new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));
((RabbitMQBinding)binding).OneWayOnly = true;
m_host.AddServiceEndpoint(typeof(ILogServiceContract), binding, "LogService");
m_host.Open();
m_serviceStarted = true;
}
StartService方法的主體與我們平時啟動WCF服務的方法差不多,只不過是將其中的URL協議部分換成了“soap.amqp”形式,而其中的傳入參數binding則是RabbitMQBinding類型,該類型是rabbitmq客戶端類庫提供的用于對應Binding類的RabbitMQBinding實現。下面就是其類實始化代碼:

其包括兩個參數,一個是rabbitmq服務地址,一個是所用的協議,其對應示例app.config文件中的如下結點:
<add key="manual-test-broker-protocol" value="AMQP_0_8"/>
這樣,我們就完成了初始化服務實例工作。接著來構造客戶端代碼,如下:

private ILogServiceContract m_client;
public ILogServiceContract GetClient(Binding binding)
{
((RabbitMQBinding)binding).OneWayOnly = true;
m_factory = new ChannelFactory<ILogServiceContract>(binding, "soap.amqp:///LogService");
m_factory.Open();
return m_factory.CreateChannel();
}
與平時寫的代碼相似,但傳入參數就是上面提到的那個RabbitMQBinding實例,這樣通過下面代碼訪問WCF中的LOG方法:
m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));
m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));
....
到這里,我們可以看出,它的實現還是很簡單的。我們只要把10.0.4.79:5672上的rabbitmq的環境跑起來,就可以看出最終的效果了。 之后我將C#的服務端(startservice)與客戶端(getclient)分開布署到不同IP的主機上,也實現了示例中的結果。
TwoWay
下面介紹一下 TwoWay雙向通信示例,首先是WCF接口聲明和實現:

public interface ICalculator
{
[OperationContract]
int Add(int x, int y);
[OperationContract]
int Subtract(int x, int y);
}
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*為每個客戶端調用分配一個服務實例*/
public sealed class Calculator : ICalculator
{
public int Add(int x, int y)
{
return x + y;
}
public int Subtract(int x, int y)
{
return x - y;
}
}
因為其服務的啟動startservice和客戶端實例構造與oneway方法類似,為了節約篇幅,這時就略過了,下面是其最終調用代碼(位于TwoWayTest.cs):

{
StartService(Program.GetBinding());
ICalculator calc = GetClient(Program.GetBinding());
int result = 0, x = 3, y = 4;
Util.WriteLine(ConsoleColor.Magenta, " {0} + {1} = {2}", x, y, result = calc.Add(x, y));
if (result != x + y)
throw new Exception("Test Failed");
......
}
與普通的WCF TWOWAY 返回調用方式相同,就不多說了。
Session
下面是基于Session會話方式的代碼,WCF接口聲明和實現:

public interface ICart
{
[OperationContract]
void Add(CartItem item);
[OperationContract]
double GetTotal();
Guid Id { [OperationContract]get; }
}
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
public class Cart : ICart
{
public Cart()
{
Items = new List<CartItem>();
m_id = Guid.NewGuid();
}
private Guid m_id;
private List<CartItem> m_items;
private List<CartItem> Items {
get { return m_items; }
set { m_items = value; }
}
public void Add(CartItem item)
{
Items.Add(item);
}
public double GetTotal()
{
double total = 0;
foreach (CartItem i in Items)
total += i.Price;
return total;
}
public Guid Id { get { return m_id; } }
}
該接口實現一個購物車功能,可以添加商品并計算總價,考慮到并發場景,這里將其實例為PerSession枚舉類型,即為每個客戶端會話分配一個服務實例。這樣就可以在用戶點擊購買一件商品里,為其購物車商品列表List<CartItem>添加一條信息,而不會與其它用戶的購物車商品列表相沖突。其最終的調用方法如下:

{
StartService(Program.GetBinding());
ICart cart = GetClient(Program.GetBinding());
AddToCart(cart, "Beans", 0.49);//添加商品到購物車
AddToCart(cart, "Bread", 0.89);
AddToCart(cart, "Toaster", 4.99);
double total = cart.GetTotal();//計算總價
if (total != (0.49 + 0.89 + 4.99))
throw new Exception("Incorrect Total");
......
}
Duplex
最后,再介紹一下如何基于Duplex雙向通信模式進行開發,DuplexTest這是個“PIZZA訂單”的場景,用戶下單之后,等待服務端將PIZZA加工完畢,然后服務端用callback方法通知客戶端PIZZA已做好,相應WCF接口聲明和實現如下:

public interface IPizzaService
{
[OperationContract(IsOneWay=true)]
void PlaceOrder(Order order);
}
[ServiceContract]
public interface IPizzaCallback
{
[OperationContract(IsOneWay=true)]
void OrderReady(Guid id); /*用于通知客戶端*/
}
public class PizzaService : IPizzaService
{
public void PlaceOrder(Order order)
{
foreach (Pizza p in order.Items)
{
Util.WriteLine(ConsoleColor.Magenta, " [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);
}
Util.WriteLine(ConsoleColor.Magenta, " [SVC] Order {0} is Ready!", order.Id);
Callback.OrderReady(order.Id);
}
IPizzaCallback Callback
{
get { return OperationContext.Current.GetCallbackChannel<IPizzaCallback>(); } //當前上下文中調用客戶端綁定的回調方法
}
}
這里要說明的是IPizzaCallback接口的OrderReady方法被綁定了IsOneWay=true屬性,主要是因為如果使用“請求-響應”模式,客戶端必須等服務端“響應”完成上一次“請求”后才能發出下一步“請求”。因此雖然客戶端可以使用多線程方式來調用服務,但最后的執行結果仍然表現出順序處理(效率低)。要想使服務端能夠并行處理客戶端請求的話,那我們就不能使用“請求-響應”的調用模式,所以這里使用One-Way的方式來調用服務。
下面是客戶端回調接口實現:

{
public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)
: base(context, binding, remoteAddress) { }
public void PlaceOrder(Order order)
{
Channel.PlaceOrder(order);
}
}
最終客戶端實例化(startservice)略過,因與之前示例類似。

{
PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));
client.Open();
return client;
}
上面的方法中將當前客戶端實例this(實現了IServiceTest<IPizzaService>, IPizzaCallback接口)注冊到上下文中,目的是為了將其方法的回傳調用傳遞到服務端(還記得服務端的這行代碼嗎?=>Callback.OrderReady(order.Id))
{
Util.WriteLine(ConsoleColor.Magenta, " [CLI] Order {0} has been delivered.",id);
mre.Set();
}
這樣,服務端完成pizza時,就可以調用客戶端的OrderReady方法來實現通知功能了。下面就是一個整個的下單流程實現:

{
......
StartService(Program.GetBinding());
IPizzaService client = GetClient(Program.GetBinding());
Order lunch = new Order();
lunch.Items = new List<Pizza>();
lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));
client.PlaceOrder(lunch);
......
}
好了,今天的主要內容就先到這里了,在接下來的文章中,將會介紹一個rabbitmq的實際應用場景,也是我們Discuz!NT企業版中的一個功能:記錄系統運行的錯誤日志。