上篇《RabbitMQ入門-Routing直連模式》我們介紹了可以定向發送消息,并可以根據自定義規則派發消息。看起來,這個Routing模式已經算靈活的了,但是,這還不夠,我們還有更加多樣靈活的Topic模式。
Topic模式
模型組成相較前幾種沒有什么變化,一個生產者P,一個交換機X,多個消息隊列Q以及多個消費者C
在Exchange派發消息到消息隊列Queue所用的規則不同,我們看到了有符號"*"以及"#",可以認為是通配符
"*"用于匹配一個單詞,比如"a","abc"等;"#"用于匹配0個或者多個單詞,比如"", "abc", "abc.def"等
發送端
/**
* Created by jackie on 17/8/7.
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();
}
private static String getSeverity(String[] strings){
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);這里指定的Exchagne模式為Topic模式
通過String routingKey = getRouting(argv);實現在Program arguments中填寫routing key參數
通過String message = getMessage(argv);實現在Program arguments中填寫發送的消息
這時候我們給Program argument賦值如下,并啟動發送端程序
程序運行完,可以在RabbitMQ管理應用中看到名為“topic_logs”的Exchange。
接收端
/**
* Created by jackie on 17/8/7.
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
和Routing模式異曲同工,聲明與發送端一樣的Exchange名稱
通過Program arguments得到的routing key的輸入參數,并將其與Exchange綁定,這時候就可以使用靈活的通配符了
運行情況
我們將啟動兩個消費者,并分別制定兩套Routing key的規則。
第一個消費者
第二個消費者
啟動兩個消費者后,使用發送端發送一條消息,我們可以發現兩個消費者都通過Routing key規則派發到了消息
注意:實際上如果Routing key寫成了“#”表示能夠接受所有的消息,類似廣播模式。
這就是Topic模式,到此為止,幾大主要RabbitMQ模式已經講完了。你是否對于RabbitMQ有了一個基本的了解了?
如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,并和您一起分享我日常閱讀過的優質文章。
文章列表