文章出處
View Code
文章列表
ActiveMQ的延時消息是一個讓人又愛又恨的功能,具體使用可參考上篇ActiveMQ筆記(6):消息延時投遞,在很多需要消息延時投遞的業務場景十分有用,但是也有一個缺陷,在一些大訪問量的場景,如果瞬間向MQ發送海量的延時消息,超過MQ的調度能力,就會造成很多消息到了該投遞的時刻,卻沒有投遞出去,形成積壓,一直停留在ActiveMQ web控制臺的Scheduled面板中。
下面的代碼演示了,如何清理activemq中的延時消息(包括:全部清空及清空指定時間段的延時消息),這也是目前唯一可行的辦法。
為了演示方便,先封裝一個小工具類:
package cn.mwee.utils.mq; import cn.mwee.utils.list.ListUtil; import cn.mwee.utils.log4j2.MwLogger; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessagePostProcessor; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.TextMessage; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * Created by yangjunming on 6/20/16. */ public final class MessageUtil { private Logger logger = new MwLogger(MessageUtil.class);//這里就是一個Log4j2的實例,大家可以換成原生的log4j2或類似工具 private ConnectionFactory connectionFactory; private long receiveTimeout;//接收超時時間 private JmsTemplate jmsTemplate; private List<String> destinationQueueNames; private final static String BACKUP_QUEUE_SUFFIX = "_B"; private boolean autoBackup = false;//是否自動將消息備份到_b的隊列,方便調試 public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List<String> destinationQueueNames) { this.connectionFactory = connectionFactory; this.receiveTimeout = receiveTimeout; this.destinationQueueNames = new ArrayList<>(); this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList())); jmsTemplate = new JmsTemplate(this.connectionFactory); jmsTemplate.setReceiveTimeout(this.receiveTimeout); } public MessageUtil(ConnectionFactory connectionFactory, List<String> destinationQueueNames) { this(connectionFactory, 10000, destinationQueueNames); } public void convertAndSend(Object message) { if (ListUtil.isEmpty(destinationQueueNames)) { logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString()); return; } for (String dest : destinationQueueNames) { jmsTemplate.convertAndSend(dest, message); if (autoBackup) { jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message); } } } public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) { if (ListUtil.isEmpty(destinationQueueNames)) { logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString()); return; } for (String dest : destinationQueueNames) { jmsTemplate.convertAndSend(dest, message, messagePostProcessor); if (autoBackup) { jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor); } } } public void convertAndSend(String destinationName, Object message) { if (StringUtils.isBlank(destinationName)) { logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString()); return; } jmsTemplate.convertAndSend(destinationName, message); if (autoBackup) { jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message); } } public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) { if (StringUtils.isBlank(destinationName)) { logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString()); return; } jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor); if (autoBackup) { jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor); } } public static String getText(javax.jms.Message message) { if (message instanceof TextMessage) { try { return ((TextMessage) message).getText(); } catch (JMSException e) { return message.toString(); } } return message.toString(); } public String getFirstDestination() { if (ListUtil.isEmpty(destinationQueueNames)) { return null; } return destinationQueueNames.get(0); } public boolean isAutoBackup() { return autoBackup; } public void setAutoBackup(boolean autoBackup) { this.autoBackup = autoBackup; } }
其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 這個方法,其它代碼可以無視。
先來模擬瞬間向MQ發送大量延時消息:
/** * 發送延時消息 * * @param messageUtil */ private static void sendScheduleMessage(MessageUtil messageUtil) { for (int i = 0; i < 10000; i++) { Object obj = "test:" + i; messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000)); } }
這里向MQ發送了1w條延時消息,每條消息延時1秒*i,上面代碼中的ScheduleMessagePostProcessor類可在上篇中找到。
運行完之后,MQ中應該堆積著了很多消息了:
下面的代碼可以清空所有延時消息:
/** * 刪除所有延時消息 * * @param connectionFactory * @throws JMSException */ private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException { Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); MessageProducer producer = session.createProducer(management); Message request = session.createMessage(); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); producer.send(request); }
清空所有延時消息,有些用力過猛了,很多時候,我們只需要清理掉過期的延時消息(即:本來計劃是8:00投遞出去的消息,結果過了8點還沒投遞出去)
/** * 刪除過期的延時消息 * * @param connectionFactory * @throws JMSException */ private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException { long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//刪除:當前時間前12小時范圍的延時消息 long end = System.currentTimeMillis(); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION); MessageProducer producer = session.createProducer(management); Message request = session.createMessage(); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start)); request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end)); producer.send(request); }
與上一段代碼基本相似,只是多指定了刪除消息的起止時間段。
最后貼一段spring的配置文件及main函數入口

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 5 6 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 7 <property name="connectionFactory"> 8 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 9 <property name="brokerURL" 10 value="failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false&backup=true"/> 11 <property name="maxThreadPoolSize" value="100"/> 12 </bean> 13 </property> 14 </bean> 15 16 <bean id="messageUtil" class="cn.mwee.utils.mq.MessageUtil"> 17 <constructor-arg index="0" ref="jmsFactory"/> 18 <constructor-arg index="1" value="10000"/> 19 <constructor-arg index="2"> 20 <list> 21 <value>dest1</value> 22 <value>dest2</value> 23 </list> 24 </constructor-arg> 25 <property name="autoBackup" value="true"/> 26 </bean> 27 28 </beans>
main函數:
public static void main(String[] args) throws InterruptedException, JMSException { ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml"); ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory"); MessageUtil messageUtil = context.getBean(MessageUtil.class); // sendScheduleMessage(messageUtil); // deleteAllScheduleMessage(connectionFactory); deleteExpiredScheduleMessage(connectionFactory); }
參考文章:
文章列表
全站熱搜