文章出處
文章列表
定義
廣播消息是指生產者產生的消息將分發給所有訂閱這個消息的消費者,而普通的模式是:一批消息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模塊就是共同消費模塊;而今天說的是廣播消息,它是指一些消息同時被推送到多個訂閱者,而這些訂閱者收到的消息都是完整的,如consumer1收到的會是1,2,3,4,5,6,而consumer2回到的也會是1,2,3,4,5,6,這種就像廣播一樣,把消息廣播給多人!
實質上是對Fanout類型的exchange的實現
通過我們RabbitMq的后臺可以看到,它會使用fanout模式,并且會自己添加隊列,當然隊列名稱也是動態的.
廣播模式的生產者
static void TestFanout(int _index) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { const string EXCHANGE_NAME = "logs"; const string ROUTING_KEY = ""; channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//廣播 var message = "hello out"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(EXCHANGE_NAME, ROUTING_KEY, null, body);//不需要指定routing key,設置了fanout,指了也沒有用. Console.WriteLine(" [x] Sent {0}", message + _index); } } }
廣播模式的消費者
static void TestFanout() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { const string EXCHANGE_NAME = "logs"; const string ROUTING_KEY = ""; channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//廣播 QueueDeclareOk queueOk = channel.QueueDeclare(); string queueName = queueOk.QueueName; channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//不需要指定routing key,設置了fanout,指了也沒有用. var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, true, consumer); Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C"); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//掛起的操作 var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); } } } }
通過測試我們發布,由producer生產的消息,已經被推送到所有消費者那邊了...
以上就是RabbitMQ的廣播模式,通過本講的學習,我們知道隊列的又一用法!
感謝各位的閱讀!
文章列表
全站熱搜
留言列表