NET 下RabbitMQ實踐 [實戰篇]
之前的文章中,介紹了如何將RabbitMQ以WCF方式進行發布。今天就介紹一下我們產品中如何使用RabbitMQ的!
在Discuz!NT企業版中,提供了對HTTP錯誤日志的記錄功能,這一點對企業版非常重要,另外存儲錯誤日志使用了MongoDB,理由很簡單,MongoDB的添加操作飛快,即使數量過億之后插入速度依舊不減。
在開始正文之前,先說明一下本文的代碼分析順序,即:程序入口==》RabbitMQ客戶端===>RabbitMQ服務端。好了,閑話少說,開始正文!
首先是程序入口,也就是WCF+RabbitMQ客戶端實現:因為Discuz!NT使用了HttpModule方式來接管HTTP鏈接請求,而在.NET的HttpModule模板中,可以通過如下方法來接管程序運行時發生的ERROR,如下:
context.Error += new EventHandler(Application_OnError);
{
string requestUrl = DNTRequest.GetUrl();
HttpApplication application = (HttpApplication)sender;
HttpContext context = application.Context;#if EntLib
if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//當開啟errlog錯誤日志記錄功能時
{
RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString()));//異步方式
//RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!"));//同步方式
return;
}
#endif
...
}
當然從代碼可以看出,記錄日志的工作基本是通過配置文件控制的,即“HttpModuleErrLog.Enable”。而RabbitMQClientHelper是一個封裝類,主要用于反射生成IHttpModuleErrlogClient接口實例,該實例就是“基于WCF發布的RabbitMQ”的客戶端訪問對象。
/// RabbitMQ
/// </summary>
public class RabbitMQClientHelper
{
static IHttpModuleErrlogClient ihttpModuleErrLogClient;
private static object lockHelper = new object();
public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()
{
if (ihttpModuleErrLogClient == null)
{
lock (lockHelper)
{
if (ihttpModuleErrLogClient == null)
{
try
{
if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
{
ihttpModuleErrLogClient = (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
"Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client", false, true));
}
}
catch
{
throw new Exception("請檢查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目錄下!");
}
}
}
}
return ihttpModuleErrLogClient;
}
}
可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient對象(注:使用反射的原因主要是解決企業版代碼與普遍版代碼在項目引用上的相互依賴),下面就是其接口和具體要求實現:
/// IHttpModuleErrlogClient 客戶端接口類,用于反射實例化綁定
/// </summary>
public interface IHttpModuleErrlogClient
{
void AddLog(HttpModuleErrLogData httpModuleErrLogData);
void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);
}
public class HttpModuleErrLogClient : IHttpModuleErrlogClient
{
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
try
{
//((RabbitMQBinding)binding).OneWayOnly = true;
ChannelFactory<IHttpModuleErrLogService> m_factory = new ChannelFactory<IHttpModuleErrLogService>(GetBinding(), "soap.amqp:///HttpModuleErrLogService");
m_factory.Open();
IHttpModuleErrLogService m_client = m_factory.CreateChannel();
m_client.AddLog(httpModuleErrLogData);
((IClientChannel)m_client).Close();
m_factory.Close();
}
catch (System.Exception e)
{
string msg = e.Message;
}
}
private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);
public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)
{
delegateAddLog AddLog_aysncallback = new delegateAddLog(AddLog);
AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, null, null);
}
public Binding GetBinding()
{
return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
}
}
可以看出,AddLog方法與上一篇中的客戶端內容基本上沒什么太大差別,只不過它提供了同步和異步訪問兩種方式,這樣做的目的主要是用戶可根據生產環境來靈活配置。
下面就來看一下RabbitMQ的服務端實現,首先看一下其運行效果,如下圖:
接著看一下啟動rabbitmq服務的代碼:
{
m_host = new ServiceHost(typeof(HttpModuleErrLogService), new Uri("soap.amqp:///"));
//((RabbitMQBinding)binding).OneWayOnly = true;
m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService), binding, "HttpModuleErrLogService");
m_host.Open();
m_serviceStarted = true;
}
上面代碼會添加IHttpModuleErrLogService接口實現類HttpModuleErrLogService 的Endpoint,并啟動它,下面就是該接口聲明:
/// IHttpModuleErrLogService 接口類
/// </summary>
[ServiceContract]
public interface IHttpModuleErrLogService
{
/// <summary>
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
[OperationContract]
void AddLog(HttpModuleErrLogData httpModuleErrLogData);
}
代碼很簡單,就是定義了一個添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)
下面就是接口的具體實現,首先是類聲明及初始化代碼:
public class HttpModuleErrLogService : IHttpModuleErrLogService
{
/// <summary>
/// 獲取 HttpModuleErrLogInfo配置文件對象實例
/// </summary>
private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;
/// <summary>
/// 定時器對象
/// </summary>
private static System.Timers.Timer _timer;
/// <summary>
/// 定時器的時間
/// </summary>
private static int _elapsed = 0;
public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)
{
_msgBox = msgBox;
_elapsed = elapsed;
//初始定時器
if (_elapsed > 0)
{
_timer = new System.Timers.Timer() { Interval = elapsed * 1000, Enabled = true, AutoReset = true };
_timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);
_timer.Start();
}
}
/// <summary>
/// 時間到時執行出隊操作
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
Dequeue();
}
可以看出,這里使用了靜態定時器對象,來進行定時訪問隊列信息功能(“非同步出隊”操作),這樣設計的原因主要是為用戶提供適合的配置方式,即如果不使用定時器(為0時),則系統會在日志入隊后,就立即啟動出隊(“同步出隊”)操作獲取日志信息并插入到MongoDB數據庫中。
下面介紹一下入隊操作實現:
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
Enqueue(httpModuleErrLogData);
if (_elapsed <=0) //如果使用定時器(為0 時),則立即執行出隊操作
Dequeue();
}
/// <summary>
/// 交換機名稱
/// </summary>
private const string EXCHANGE = "ex1";
/// <summary>
/// 交換方法,更多內容參見:http://melin.javaeye.com/blog/691265
/// </summary>
private const string EXCHANGE_TYPE = "direct";
/// <summary>
/// 路由key,更多內容參見:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
/// </summary>
private const string ROUTING_KEY = "m1";
/// <summary>
/// 日志入隊
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)
{
Uri uri = new Uri(httpModuleErrorLogInfo.RabbitMQAddress);
ConnectionFactory cf = new ConnectionFactory()
{
UserName = httpModuleErrorLogInfo.UserName,
Password = httpModuleErrorLogInfo.PassWord,
VirtualHost = "dnt_mq",
RequestedHeartbeat = 0,
Endpoint = new AmqpTcpEndpoint(uri)
};
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (EXCHANGE_TYPE != null)
{
ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);//,true,true,false,false, true,null);
ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true);//true, true, true, false, false, null);
ch.QueueBind(httpModuleErrorLogInfo.QueueName, EXCHANGE, ROUTING_KEY, false, null);
}
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "HttpErrLog";
IDictionary targetBody = b.Body;
targetBody["body"] = SerializationHelper.Serialize(httpModuleErrLogData);
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;//persistMode
ch.BasicPublish(EXCHANGE, ROUTING_KEY,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
}
}
}
代碼很簡單,主要構造rabbitmq鏈接(ConnectionFactory)并初始化相應參數如用戶名,密碼,ROUTING_KEY等。
然后將傳入的日志對象序列化成字符串對象,賦值給targetBody["body"],這樣做主要是因為我沒找到更好的方法來賦值(之前嘗試直接綁定httpModuleErrLogData到targetBody["body"],但在出隊操作中找不到合適方法將httpModuleErrLogData對象解析出來)。下面就是出隊操作:
/// 日志出隊
/// </summary>
public static void Dequeue()
{
string serverAddress = httpModuleErrorLogInfo.RabbitMQAddress.Replace("amqp://", "").TrimEnd('/'); //"10.0.4.85:5672";
ConnectionFactory cf = new ConnectionFactory()
{
UserName = httpModuleErrorLogInfo.UserName,
Password = httpModuleErrorLogInfo.PassWord,
VirtualHost = "dnt_mq",
RequestedHeartbeat = 0,
Address = serverAddress
};
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
while (true)
{
BasicGetResult res = ch.BasicGet(httpModuleErrorLogInfo.QueueName, false);
if (res != null)
{
try
{
string objstr = System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace("\0\0\0body\0\n", "");//去掉頭部信息
object obj = SerializationHelper.DeSerialize(typeof(HttpModuleErrLogData), objstr);
HttpModuleErrLogData httpModuleErrLogData = obj as HttpModuleErrLogData;
if (httpModuleErrLogData != null)
{
MongoDbHelper.Insert(new Mongo(httpModuleErrorLogInfo.MongoDB), "dnt_httpmoduleerrlog", LoadAttachment(httpModuleErrLogData));
_msgBox.BeginInvoke(new ShowMsg(SetMsgRichBox), "\r發生時間:" + httpModuleErrLogData.TimeStamp + "\r錯誤等級:" + httpModuleErrLogData.Level + "\r詳細信息:" + httpModuleErrLogData.Message);
ch.BasicAck(res.DeliveryTag, false);
}
}
catch { }
}
else
break;
}
}
}
}
出隊操作也是先實例化鏈接到rabbitmq 的實例,并循環使用BasicGet方法來單條獲取隊列信息,并最終將res.Body的數據序列化成HttpModuleErrLogData對象,并最終插入到mongodb數據庫中。同時將獲取的隊列消息顯示出來:
這里使用異步方式顯示出隊的日志信息,其聲明的delegate 方法“ShowMsg”如下:
/// 聲明委托
/// </summary>
/// <param name="message"></param>
public delegate void ShowMsg(string message);
/// <summary>
/// 綁定到上面 delegate的方法
/// </summary>
/// <param name="outPut"></param>
public static void SetMsgRichBox(string outPut)
{
_msgBox.Text += "\r================================== \r下列錯誤信息出隊時間=>" + DateTime.Now + outPut + "\r";
}
同時使用LoadAttachment方法來實現HttpModuleErrLogData到mongodb的Document類型的轉換:
/// 將 HttpModuleErrLogData轉換成Document類型
/// </summary>
/// <param name="httpModuleErrLogData"></param>
/// <returns></returns>
public static Document LoadAttachment(HttpModuleErrLogData httpModuleErrLogData)
{
Document doc = new Document();
doc["_id"] = httpModuleErrLogData.Oid;
doc["level"] = httpModuleErrLogData.Level;
doc["message"] = httpModuleErrLogData.Message;
doc["timestamp"] = httpModuleErrLogData.TimeStamp;
return doc;
}
到這里,主要的功能介紹就差不多了。當然本文所闡述的只是一個原型,相信會隨著對rabbitmq的理解深入而不斷完善,感興趣的朋友歡迎討論交流,以糾正我認識上的偏差,呵呵。
留言列表