文章出處

回到目錄

關于持久化到Redis的消息格式,主要是說在Broker上把消息持久化的過程中,需要存儲哪些類型的消息,因為我們的消息是分topic的,而每個topic又有若干個queue組成,而我們的topic和queue由于redis存儲結構的原因,我們需要將它們分區對應存儲一下,而不能像關系型數據庫那樣靈活,所以要額外設計幾個數據結構來存儲它們。

一 Topic字典

二 Topic對應的Queue字典

三 Queue里的消息

四 某個客戶端對應某個Queue的消費進度

以上四個結構是我們要說的,它們會在推消息,拉消息,刪消息時用到,下面一一介紹一下,講的不好不對的地方,歡迎大家為大叔留言。

一 Topic字典

主要存儲每個topic,它是一個set集合,redis的我集合類型之一,每個key是唯一的LindMq_Topic,值value就是我們客戶端傳來的具體topic的名字,這主要是在刪除過期的消息時用的,主是作用是遍歷所有的topic消息類型,這樣我們在刪除消息時,就可以把所有注冊的topic都找到了,最后把過期的刪除,默認消息存活周期是一天。

刪除過期的消息代碼如下

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
  foreach (var topic in topicList)
   {
     var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);
         foreach (var queue in queueList)
           {
            var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");
              RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);
            }
    }

二 Topic對應的Queue字典

我們知道,為了加大redis的并發量和吞吐量,我們會把大數據鍵值對設計成多個鍵,這就像是一個集群環境的sharing,就是將大數據進行分片,而我們的分片規則是采用按對象取模的方式,模數可以自己設置,比較我設置8,那說明我的隊列(分片)最多可以被分為8個,這個大家可以去做測試,挺有意思的,比隨機數來個直接!而這一次redis里的鍵就是某個topic,而值就是我們的topic加上隊列索引,例如你的topic是zzl,那么隊列里的鍵可能就是zzl0,zzl1,zzl2...

三 Queue里的消息

我們的生產者將消息發送到broker里,然后于broker將消息持久化到具體的存儲介質里,當然這里我們用的是Redis,在存儲在redis里時,我們的具體隊列的鍵是有后綴的,這主要用于消息的回收,因為我們打算1天回收一次消息,所以我們的消息后綴是個日期變量,當然精確到天就可以了,它可以是這樣鍵名LindMQ_order_Paid4_20161202,每個隊列都有自己的后綴,我們在清除消息時也就有了方法了。我們的隊列存儲結構是比較特殊的sortedSet ,就是可排序的集合,它有權重的概念,我們剛好可以使用這個特性來記錄客戶端的消費進度,因為我們的權重值在一個redis鍵/值對里是唯一的。

下面代碼選自Push入隊列的代碼片斷,分享給大家

       //存儲當前Topic
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);

            //要存儲到哪個隊列
            body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
            var dataKey = body.Topic + body.QueueId;
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);

            //記錄偏移
            var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
            body.QueueOffset = offset + 1;

            //存儲消息
            RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
                GetRedisDataKey(dataKey),
                Utils.SerializeMemoryHelper.SerializeToJson(body),
                score: body.QueueOffset);

四 某個客戶端對應某個Queue的消費進度

消費進度是一個很麻煩的問題,生產者的消息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出隊列后就消失了,這是不靠譜的,萬一消失失敗了,也會造成消息的丟失!下面我們主要看一下消費進度的存儲,它是一個Hash集合,其中redis的鍵名是LindMQ_ConsumerOffset,而value是一個hash對象,hash里的key是當前隊列名+消費者IP地址的hashcode值,hash里的value是這個消費者(客戶端)的消費進度(Queue里的權重,Queue的存儲結構是一個sortedSet)。

客戶端消費的測試代碼

            #region Client-LindMQ
            var consumer = new ConsumerSetting
            {
                BrokenName = "test",
                BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),
                Callback = new Dictionary<string, Action<MessageBody>>() { 
                {"zzl",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(1000);
                }},
                {"zhz",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(2000);
                }}
                }
            };
            var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
            consumerClient.Start();
            #endregion

客戶端消費的測試結果

好了,到這里我們的LindMQ里數據存儲結構的內容就講完了,主要使用了redis里的set,sortedSet,hash等數據結構,在設計過程中,使用了分片(Sharing)的概念,當然也是借鑒了mongodb和redis集群的設計理念,同時借鑒了方雪華老兄的EQueue設計理念,在這里和他們說一聲:謝謝!

感謝各位對Lind的支持!

回到目錄

 


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()