文章出處

在上篇《RabbitMQ-高效的Work模式》中,我們了解了Work模型,該模型包括一個生產者,一個消息隊列和多個消費者。
我們已經通過實例看出消息隊列中的消息是如何被一個或者多個消費者消費的了,但是對于具體的實現細節和原理并沒有介紹。這篇就來詳細介紹下在消息派發這個過程中還有那些我們需要關注的點和細節。
這篇主要討論細節都集中在接收端,我們還是來看下上篇中,接收端的代碼實現

package com.ximalaya.openapi.rabbitmq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
/**
 * Created by jackie on 17/8/4.
 */
public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

消息是怎么合理的派發給各個消費者的

在上篇介紹的實例中,我們看到運行兩個消費者,這時候生產者生產的4條信息是均勻的派發給了兩個消費者。

你可能會好奇,這個消息隊列Queue怎么會這么“智能”,能夠做到如此公平的進行消息派發。看完下面的場景你可能就不覺得RabbitMQ這樣做是聰明了。

其實,默認情況下的RabbitMQ就是這么“智能”,公平、公正、公開的將4個消息依次派發給兩個消費者。如果啟動了四個消費者,那也將是每個消費者消費一條消息。

這是為什么呢?
RabbitMQ派發消息默認采用的是輪詢機制,輪詢,顧名思義就是挨個的派發,就是第一個派發給C1,第二個派發給C2,第三個派發給C1,第四個派發給C4。正常情況下,這樣很好,但是如果遇到某個消費者在消費某個消息時花費時間很長或者因為自身原因或者網絡原因阻塞,那么按照這種輪詢的策略就顯得不合適了。

假設C2在執行第二個派發的消息一直卡住,這時候即使派發新的消息,C2也無法正常消費,如果一直這么盲目的派發消息給C2,只會讓更多的消息無法正常消費,直至消息隊列卡住崩潰。

這時候我們采用一種新的機制,姑且稱為"公平機制"。該機制下,我們在同一時間內只給消費者派發一個消息(派發的數量可以人工配置),RabbitMQ只有等到該消費者確認消費了上一條消息后,才會繼續派發下一條消息。

這個代碼實現也很簡單,就是上面接收端中的
channel.basicQos(1);
這里的數字1就是剛剛提到可以人工配置的派發消息的數量。

實例驗證
要驗證有basicQos和沒有basicQos,我們需要做一些分析,并對代碼做部分改動。

當前啟動消費端,每個消費者消費的時間都是固定2秒,即使加上basicQos,因為兩個隊列的消費速率相同,所以最終還是會出現兩個消費者各自消費兩條消息的情況。

為了營造其中一個消費者卡住的情況,我們將后面啟動的消費者的消費時間設定為8秒,這樣第一個消費者即使消費了三條消息,這時候第二個消費者仍然卡住,便能看到效果。

下面先看沒有添加basicQos的情況,第一個和第二個消費者的消費時間分別是2秒和8秒
31ef00018637eb681546

第一個Work消費時間是2秒的,第二個是8秒

看完整個消費過程,會發現沒有basiQos設置,會執行輪詢策略,每個消費者都消費了兩個消息

再看添加basicQos的情況,第一個和第二個消費者的消費時間同樣分別是2秒和8秒
31e800047b1ec8ee8bab

同樣,第一個Work消費時間是2秒的,第二個是8秒

看完整個消費過程,會發現有basiQos設置,會執行公平機制,第一個消息給C1,第二個給C2,第三個消息來的時候,這時候發現C2還在消費,就派發給了已經消費完空閑的C1,第四個消息來的時候,發現C2仍然在消費,這時候就把消息派發給了消費完第三個消息的C1,C1總共消費3條消息用時6秒,而C2消費一條消息時8秒,所以這就是公平機制。

對比完后,我們發現這種公平機制更加合理,能夠很好的做到負載均衡,避免因為不顧消費者的消費情況而盲目派發情況的出現。

如何保證派發出去的消息不丟失

現在如果出現這樣的一種情況:消息從Queue中取出,但是沒有消費者因為各種情況并沒有完成這條消息的消費,但是這條消息已經從內存中刪除了,這就意味著這個消息模型就失去了這條消息,這種意外在大多數場景下是不允許出現的。

為什么會出現這種情況呢?
因為消息出去的時候,RabbitMQ就將其從Queue中刪除,也就是從內存中刪除,這樣做的假設前提就是默認為這條消息能夠被正常消費掉,但事實情況往往并非如此。如果此時我們加上一個確認機制,類似于TCP的三次握手,問題就能夠得到解決。

RabbitMQ將消息派發出去后并不立馬將消息從內存中刪除,等到消費端完成消費返回一個ack的標識,RabbitMQ接收到這個字段后認為消息時正常消費了在完成刪除。如果沒有收到確認標識ack,則認為消息違背正常消費,則會重新取回該條,采用輪詢或者其他機制將其派發到下一個消費者供其消費。

實例
在接收端將代碼中
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
將basicConsume函數的第二個參數改為true標識autoAck=true,即自動確認,如果設置為false,則表示需要接收端手工確認。

上篇,我們用的就是false的情況,即手動確認方式,所以在上篇的運行接口我們看到Unacknowleged標識一直從1變為0,是說明采用的是一條一條確認的機制,從第一條消息一直到第四條消息消費完成。

下面我們看看autoAck=true運行時Ready和Unacknowleged指標的變化趨勢,我們只啟動一個消費者
31e8000486d0e0d62566
請點擊此處輸入圖片描述
從運行過程可以發現,Unackowleged從0->4->3->2->1->0,autoAck=false是為0->1->1->1->1->0
說明autoAck=false時是一次性派發了4條信息,沒有顧忌消費者是否有發送確認標識。之后消費者再依次完成消費。
​如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,并和您一起分享我日常閱讀過的優質文章。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()