基于消息與.Net Remoting的分布式處理架構

作者: 張逸  來源: 博客園  發布時間: 2010-08-18 11:13  閱讀: 2025 次  推薦: 0   原文鏈接   [收藏]  
 
摘要:分布式處理在大型企業應用系統中,最大的優勢是將負載分布。通過多臺服務器處理多個任務,以優化整個系統的處理能力和運行效率。分布式處理的技術核心是完成服務與服務之間、服務端與客戶端之間的通信。
[1] 基于消息與.Net Remoting的分布式處理架構
[2] 基于消息與.Net Remoting的分布式處理架構

      分布式處理在大型企業應用系統中,最大的優勢是將負載分布。通過多臺服務器處理多個任務,以優化整個系統的處理能力和運行效率。分布式處理的技術核心是完成服務與服務之間、服務端與客戶端之間的通信。在.Net 1.1中,可以利用Web Service或者.Net Remoting來實現服務進程之間的通信。本文將介紹一種基于消息的分布式處理架構,利用了.Net Remoting技術,并參考了CORBA Naming Service的處理方式,且定義了一套消息體制,來實現分布式處理。 

   一、消息的定義

       要實現進程間的通信,則通信內容的載體——消息,就必須在服務兩端具有統一的消息標準定義。從通信的角度來看,消息可以分為兩類:Request Messge和Reply Message。為簡便起見,這兩類消息可以采用同樣的結構。

       消息的主體包括ID,Name和Body,我們可以定義如下的接口方法,來獲得消息主體的相關屬性:

 
public interface IMessage:ICloneable
{
IMessageItemSequence GetMessageBody();

string GetMessageID();
string GetMessageName();
void SetMessageBody(IMessageItemSequence aMessageBody);
void SetMessageID(string aID);
void SetMessageName(string aName);
}

    消息主體類Message實現了IMessage接口。在該類中,消息體Body為IMessageItemSequence類型。這個類型用于Get和Set消息的內容:Value和Item:

 
public interface IMessageItemSequence:ICloneable
{
IMessageItem GetItem(
string aName);
void SetItem(string aName,IMessageItem aMessageItem);
string GetValue(string aName);
void SetValue(string aName,string aValue);
}

       Value為string類型,并利用HashTable來存儲Key和Value的鍵值對。而Item則為IMessageItem類型,同樣的在IMessageItemSequence的實現類中,利用HashTable存儲了Key和Item的鍵值對。

       IMessageItem支持了消息體的嵌套。它包含了兩部分:SubValue和SubItem。實現的方式和IMessageItemSequence相似。定義這樣的嵌套結構,使得消息的擴展成為可能。一般的結構如下:

       IMessage——Name
                     ——ID
                     ——Body(IMessageItemSequence)
                            ——Value
                            ——Item(IMessageItem)
                                   ——SubValue
                                   ——SubItem(IMessageItem)
                                          ——……

       各個消息對象之間的關系如下:

Distribute1.gif


       在實現服務進程通信之前,我們必須定義好各個服務或各個業務的消息格式。通過消息體的方法在服務的一端設置消息的值,然后發送,并在服務的另一端獲得這些值。例如發送消息端定義如下的消息體:

 
IMessageFactory factory = new MessageFactory();
IMessageItemSequence body
= factory.CreateMessageItemSequence();
body.SetValue(
"name1","value1");
body.SetValue(
"name2","value2");
IMessageItem item
= factory.CreateMessageItem();
item.SetSubValue(
"subname1","subvalue1");
item.SetSubValue(
"subname2","subvalue2");
IMessageItem subItem1
= factory.CreateMessageItem();
subItem1.SetSubValue(
"subsubname11","subsubvalue11");
subItem1.SetSubValue(
"subsubname12","subsubvalue12");
IMessageItem subItem2
= factory.CreateMessageItem();
subItem1.SetSubValue(
"subsubname21","subsubvalue21");
subItem1.SetSubValue(
"subsubname22","subsubvalue22");
item.SetSubItem(
"subitem1",subItem1);
item.SetSubItem(
"subitem2",subItem2);
body.SetItem(
"item",item);
//Send Request Message
MyServiceClient service = new MyServiceClient("Client");
IMessageItemSequence reply
= service.SendRequest("TestService","Test1",body);

       在接收消息端就可以通過獲得body的消息體內容,進行相關業務的處理。 

   二、.Net Remoting服務

       在.Net中要實現進程間的通信,主要是應用Remoting技術。根據前面對消息的定義可知,實際上服務的實現,可以認為是對消息的處理。因此,我們可以對服務進行抽象,定義接口IService:

 
public interface IService
{
IMessage Execute(IMessage aMessage);
}

       Execute()方法接受一條Request Message,對其進行處理后,返回一條Reply Message。在整個分布式處理架構中,可以認為所有的服務均實現該接口。但受到Remoting技術的限制,如果要實現服務,則該服務類必須繼承自MarshalByRefObject,同時必須在服務端被Marshal。隨著服務類的增多,必然要在服務兩端都要對這些服務的信息進行管理,這加大了系統實現的難度與管理的開銷。如果我們從另外一個角度來分析服務的性質,基于消息處理而言,所有服務均是對Request Message的處理。我們完全可以定義一個Request服務負責此消息的處理。

       然而,Request服務處理消息的方式雖然一致,但畢竟服務實現的業務,即對消息處理的具體實現,卻是不相同的。對我們要實現的服務,可以分為兩大類:業務服務與Request服務。實現的過程為:首先,具體的業務服務向Request服務發出Request請求,Request服務偵聽到該請求,然后交由其偵聽的服務來具體處理。

       業務服務均具有發出Request請求的能力,且這些服務均被Request服務所偵聽,因此我們可以為業務服務抽象出接口

 
IListenService:
public interface IListenService
{
IMessage OnRequest(IMessage aMessage);
}
//Request服務實現了IService接口,
//并包含IListenService類型對象的委派,以執行OnRequest()方法:

public class RequestListener:MarshalByRefObject,IService
{

public RequestListener(IListenService listenService)
{
m_ListenService
= listenService;
}

private IListenService m_ListenService;
#region IService Members
public IMessage Execute(IMessage aMessage)
{

return this.m_ListenService.OnRequest(aMessage);
}

#endregion
public override object InitializeLifetimeService()
{

return null;
}
}

       在RequestListener服務中,繼承了MarshalByRefObject類,同時實現了IService接口。通過該類的構造函數,接收IListService對象。

       由于Request消息均由Request服務即RequestListener處理,因此,業務服務的類均應包含一個RequestListener的委派,唯一的區別是其服務名不相同。業務服務類實現IListenService接口,但不需要繼承MarshalByRefObject,因為被Marshal的是該業務服務內部的RequestListener對象,而非業務服務本身:

 
public abstract class Service:IListenService
{

public Service(string serviceName)
{
m_ServiceName
= serviceName;
m_RequestListener
= new RequestListener(this);
}

#region IListenService Members
public IMessage OnRequest(IMessage aMessage)
{

//……
}
#endregion
private string m_ServiceName;
private RequestListener m_RequestListener;
}

       Service類是一個抽象類,所有的業務服務均繼承自該類。最后的服務架構如下:

Distribute2.gif


       我們還需要在Service類中定義發送Request消息的行為,通過它,才能使業務服務被RequestListener所偵聽。

 
public IMessageItemSequence SendRequest(string aServiceName,
string aMessageName,IMessageItemSequence aMessageBody)
{
IMessage message
= m_Factory.CreateMessage();
message.SetMessageName(aMessageName);
message.SetMessageID(
"");
message.SetMessageBody(aMessageBody);
IService service
= FindService(aServiceName);
IMessageItemSequence replyBody
= m_Factory.CreateMessageItemSequence();
if (service != null)
{
IMessage replyMessage
= service.Execute(message);
replyBody
= replyMessage.GetMessageBody();
}

else
{
replyBody.SetValue(
"result","Failure");
}

return replyBody;
}

       注意SendRequest()方法的定義,其參數包括服務名,消息名和被發送的消息主體。而在實現中最關鍵的一點是FindService()方法。我們要查找的服務正是與之對應的RequestListener服務。不過,在此之前,我們還需要先將服務Marshal:    

 
public void Initialize()
{
RemotingServices.Marshal(
this.m_RequestListener,
this.m_ServiceName + ".RequestListener");
}

       我們Marshal的對象,是業務服務中的Request服務對象m_RequestListener,這個對象在Service的構造函數中被實例化:

 
m_RequestListener = new RequestListener(this);

       注意,在實例化的時候是將this作為IListenService對象傳遞給RequestListener。因此,此時被Marshal的服務對象,保留了業務服務本身即Service的指引。可以看出,在Service和RequestListener之間,采用了“雙重委派”的機制。

       通過調用Initialize()方法,初始化了一個服務對象,其類型為RequestListener(或IService),其服務名為:Service的服務名 + ".RequestListener"。而該服務正是我們在SendRequest()方法中要查找的Service:

 
IService service = FindService(aServiceName);

       下面我們來看看FindService()方法的實現:

 
protected IService FindService(string aServiceName)
{

lock (this.m_Services)
{
IService service
= (IService)m_Services[aServiceName];
if (service != null)
{

return service;
}

else
{
IService tmpService
= GetService(aServiceName);
AddService(aServiceName,tmpService);

return tmpService;
}
}
}

       可以看到,這個服務是被添加到m_Service對象中,該對象為SortedList類型,服務名為Key,IService對象為Value。如果沒有找到,則通過私有方法GetService()來獲得:

 
private IService GetService(string aServiceName)
{
IService service
= (IService)Activator.GetObject(typeof(RequestListener),
"tcp://localhost:9090/" + aServiceName + ".RequestListener");
return service;
}

       在這里,Channel、IP、Port應該從配置文件中獲取,為簡便起見,這里直接賦為常量。

       再分析SendRequest方法,在找到對應的服務后,執行了IService的Execute()方法。此時的IService為RequestListener,而從前面對RequestListener的定義可知,Execute()方法執行的其實是其偵聽的業務服務的OnRequest()方法。

       我們可以定義一個具體的業務服務類,來分析整個消息傳遞的過程。該類繼承于Service抽象類:

 
public class MyService:Service
{

public MyService(string aServiceName):base(aServiceName)
{}
}

       假設把進程分為服務端和客戶端,那么對消息處理的步驟如下:

 1、 在客戶端調用MyService的SendRequest()方法發送Request消息;
 2、 查找被Marshal的服務,即RequestListener對象,此時該對象應包含對應的業務服務對象MyService;
 3、 在服務端調用RequestListener的Execute()方法。該方法則調用業務服務MyService的OnRequest()方法。

在這些步驟中,除了第一步在客戶端執行外,其他的步驟均是在服務端進行。

[第1頁][第2頁]
0
0
 
 
 

文章列表

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()