前言:在這里我將用java來簡單的實現rabbitMQ。下面我們帶著下面問題來一步步的了解和學習rabbitMQ。
1:如果消費者連接中斷,這期間我們應該怎么辦
2:如何做到負載均衡
3:如何有效的將數據發送到相關的接收者?就是怎么樣過濾
4:如何保證消費者收到完整正確的數據
5:如何讓優先級高的接收者先收到數據
一:"Hello RabbitMQ"
下面有一幅圖,其中P表示生產者,C表示消費者,紅色部分為消息隊列
二:項目開始
2.1:首先引入rabbitMQ jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
2.2:創建消費者Producer
/** * 消息生成者 */ public class Producer { public final static String QUEUE_NAME="rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ相關信息 factory.setHost("localhost"); //factory.setUsername("lp"); //factory.setPassword(""); // factory.setPort(2088); //創建一個新的連接 Connection connection = factory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); // 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello RabbitMQ"; //發送消息到隊列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send +'" + message + "'"); //關閉通道和連接 channel.close(); connection.close(); } }
注1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數
注2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體
2.3:創建消費者
public class Customer { private final static String QUEUE_NAME = "rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ地址 factory.setHost("localhost"); //創建一個新的連接 Connection connection = factory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); //聲明要關注的隊列 channel.queueDeclare(QUEUE_NAME, false, false, true, null); System.out.println("Customer Waiting Received messages"); //DefaultConsumer類實現了Consumer接口,通過傳入一個頻道, // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; //自動回復隊列應答 -- RabbitMQ中的消息確認機制 channel.basicConsume(QUEUE_NAME, true, consumer); }
前面代碼我們可以看出和生成者一樣的,后面的是獲取生產者發送的信息,其中envelope主要存放生產者相關信息(比如交換機、路由key等)body是消息實體。
2.4:運行結果
生產者:
消費者:
三:實現任務分發
工作隊列
一個隊列的優點就是很容易處理并行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這里就要采用分布機制了。
我們新創建一個生產者NewTask
public class NewTask { private static final String TASK_QUEUE_NAME="task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("localhost"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //分發信息 for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send '"+message+"'"); } channel.close(); connection.close(); } }
然后創建2個工作者Work1和Work2代碼一樣
public class Work1 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Worker1 Waiting for messages"); //每次從隊列獲取的數量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Worker1 Received '" + message + "'"); try { throw new Exception(); //doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("Worker1 Done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck=false; //消息消費完成確認 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { try { Thread.sleep(1000); // 暫停1秒鐘 } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
注:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回復,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那么如果消費者程序異常退出,那么就無法獲取數據,我們當然是不希望出現這樣的情況,所以才去手動回復,每當消費者收到并處理信息然后在通知生成者。最后從隊列中刪除這條信息。如果消費者異常退出,如果還有其他消費者,那么就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。
關于上面我們遺留問題在下一篇繼續講解
文章列表