文章出處

Hello World模式,告訴我們如何一對一發送和接收消息;

Work模式,告訴我們如何多管齊下高效的消費消息;
Publish/Subscribe模式,告訴我們如何廣播消息
那么有沒有靈活強一點的既可以高效消費,又可以同時送達多個消費者的模式?
有,這就是Routing模式,我又稱之為Direct直連模式。

Routing模式

routing模式.png

  • 一個生產者P,一個交換機X,多個消息隊列Q以及多個消費者C

  • 在Exchange和Queue中,我們看到了不同的規則,也就是Routing Key

顯然從圖中的說明,我們就知道這是一個log日志根據級別派發消息的例子。熟悉Log日志系統的應該都知道,一般的log系統分為error、info、warn和debug等。從圖中我們可以看出,將日志級別為error的定向的派發到第一個消息隊列,將error、warn和info級別的日志派發到第一個消息隊列。

該模型首先實現了定向派發,而不再是訂閱模式那種廣播式的派發。同一條消息既可以派發給一個Queue,也可以同時派發給兩個或者多個Queue,這就是該模式的靈活之處。下面來看看實例

發送端

/**
 * Created by jackie on 17/8/7.
 */
public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }

    private static String getSeverity(String[] strings){
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings){
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0 ) return "";
        if (length < startIndex ) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}
  • String severity = getSeverity(argv);通過程序參數賦值給Routing Key,作為發送消息的規則

  • String message = getMessage(argv);通過程序參數賦值作為消息實體發送到Queue

在run configurations中配置argv

image.png

*第一個參數是要綁定key的名稱,第二個參數是要發送的消息內容

  • 運行后,可以在RabbitMQ管理應用中看到exchange,但是此時沒有綁定queue,所以即使發送消息也沒有queue會存儲或者消費。

image.png

接收端

/**
 * Created by jackie on 17/8/7.
 */
public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.3.161");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}
  • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange類型為Direct類型

  • 綁定的queue的名稱也是通過program arguments指定的

image.png

這里兩個參數info和error表示綁定了兩個routing key,即如果發送routing key為info的消息該隊列能接收到,如果發送routing key為error,該隊列也能收到

運行情況

啟動接收端代碼,我們可以看到生成了Queue名稱為amq.gen-ugjKo6t4y0PXPwoh3CeubA的隊列,同時有routingKey=info和routingKey=error的綁定到了Exchange上。

image.png

這時候起送發送端給routingkey為info發送消息“hello world”,我們可以看到在接收端確實能夠收到消息“hello world”,同理,這時候發送routingkey為error的消息,該隊列同樣能夠接收到,因為隊列同時綁定了兩個routing key

image.png

這個就是Routing直連模式。

如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,并和您一起分享我日常閱讀過的優質文章。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()