文章出處

ActiveMQ的延時消息是一個讓人又愛又恨的功能,具體使用可參考上篇ActiveMQ筆記(6):消息延時投遞,在很多需要消息延時投遞的業務場景十分有用,但是也有一個缺陷,在一些大訪問量的場景,如果瞬間向MQ發送海量的延時消息,超過MQ的調度能力,就會造成很多消息到了該投遞的時刻,卻沒有投遞出去,形成積壓,一直停留在ActiveMQ web控制臺的Scheduled面板中。

下面的代碼演示了,如何清理activemq中的延時消息(包括:全部清空及清空指定時間段的延時消息),這也是目前唯一可行的辦法。

為了演示方便,先封裝一個小工具類:

package cn.mwee.utils.mq;

import cn.mwee.utils.list.ListUtil;
import cn.mwee.utils.log4j2.MwLogger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * Created by yangjunming on 6/20/16.
 */
public final class MessageUtil {

    private Logger logger = new MwLogger(MessageUtil.class);//這里就是一個Log4j2的實例,大家可以換成原生的log4j2或類似工具

    private ConnectionFactory connectionFactory;
    private long receiveTimeout;//接收超時時間
    private JmsTemplate jmsTemplate;
    private List<String> destinationQueueNames;
    private final static String BACKUP_QUEUE_SUFFIX = "_B";
    private boolean autoBackup = false;//是否自動將消息備份到_b的隊列,方便調試


    public MessageUtil(final ConnectionFactory connectionFactory, final long receiveTimeout, final List<String> destinationQueueNames) {
        this.connectionFactory = connectionFactory;
        this.receiveTimeout = receiveTimeout;
        this.destinationQueueNames = new ArrayList<>();
        this.destinationQueueNames.addAll(destinationQueueNames.stream().collect(Collectors.toList()));
        jmsTemplate = new JmsTemplate(this.connectionFactory);
        jmsTemplate.setReceiveTimeout(this.receiveTimeout);
    }

    public MessageUtil(ConnectionFactory connectionFactory, List<String> destinationQueueNames) {
        this(connectionFactory, 10000, destinationQueueNames);
    }


    public void convertAndSend(Object message) {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString());
            return;
        }
        for (String dest : destinationQueueNames) {
            jmsTemplate.convertAndSend(dest, message);
            if (autoBackup) {
                jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message);
            }
        }
    }

    public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString());
            return;
        }
        for (String dest : destinationQueueNames) {
            jmsTemplate.convertAndSend(dest, message, messagePostProcessor);
            if (autoBackup) {
                jmsTemplate.convertAndSend(dest + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
            }
        }
    }

    public void convertAndSend(String destinationName, Object message) {
        if (StringUtils.isBlank(destinationName)) {
            logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString());
            return;
        }
        jmsTemplate.convertAndSend(destinationName, message);
        if (autoBackup) {
            jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message);
        }
    }


    public void convertAndSend(String destinationName, Object message, MessagePostProcessor messagePostProcessor) {
        if (StringUtils.isBlank(destinationName)) {
            logger.error("目標隊列為空,無法發送,請檢查配置!message => " + message.toString());
            return;
        }
        jmsTemplate.convertAndSend(destinationName, message, messagePostProcessor);
        if (autoBackup) {
            jmsTemplate.convertAndSend(destinationName + BACKUP_QUEUE_SUFFIX, message, messagePostProcessor);
        }
    }

    public static String getText(javax.jms.Message message) {
        if (message instanceof TextMessage) {
            try {
                return ((TextMessage) message).getText();
            } catch (JMSException e) {
                return message.toString();
            }
        }
        return message.toString();
    }

    public String getFirstDestination() {
        if (ListUtil.isEmpty(destinationQueueNames)) {
            return null;
        }
        return destinationQueueNames.get(0);
    }


    public boolean isAutoBackup() {
        return autoBackup;
    }

    public void setAutoBackup(boolean autoBackup) {
        this.autoBackup = autoBackup;
    }
}

其中主要就用到了convertAndSend(Object message, MessagePostProcessor messagePostProcessor) 這個方法,其它代碼可以無視。

先來模擬瞬間向MQ發送大量延時消息:

    /**
     * 發送延時消息
     *
     * @param messageUtil
     */
    private static void sendScheduleMessage(MessageUtil messageUtil) {
        for (int i = 0; i < 10000; i++) {
            Object obj = "test:" + i;
            messageUtil.convertAndSend(obj, new ScheduleMessagePostProcessor(1000 + i * 1000));
        }
    }

這里向MQ發送了1w條延時消息,每條消息延時1秒*i,上面代碼中的ScheduleMessagePostProcessor類可在上篇中找到。

運行完之后,MQ中應該堆積著了很多消息了:

下面的代碼可以清空所有延時消息:

    /**
     * 刪除所有延時消息
     *
     * @param connectionFactory
     * @throws JMSException
     */
    private static void deleteAllScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
        Connection conn = connectionFactory.createConnection();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        producer.send(request);
    }

清空所有延時消息,有些用力過猛了,很多時候,我們只需要清理掉過期的延時消息(即:本來計劃是8:00投遞出去的消息,結果過了8點還沒投遞出去) 

    /**
     * 刪除過期的延時消息
     *
     * @param connectionFactory
     * @throws JMSException
     */
    private static void deleteExpiredScheduleMessage(final ConnectionFactory connectionFactory) throws JMSException {
        long start = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(12);//刪除:當前時間前12小時范圍的延時消息
        long end = System.currentTimeMillis();
        Connection conn = connectionFactory.createConnection();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        MessageProducer producer = session.createProducer(management);
        Message request = session.createMessage();
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
        request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
        producer.send(request);
    }

與上一段代碼基本相似,只是多指定了刪除消息的起止時間段。  

最后貼一段spring的配置文件及main函數入口

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 5 
 6     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
 7         <property name="connectionFactory">
 8             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
 9                 <property name="brokerURL"
10                           value="failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false&amp;backup=true"/>
11                 <property name="maxThreadPoolSize" value="100"/>
12             </bean>
13         </property>
14     </bean>
15 
16     <bean id="messageUtil" class="cn.mwee.utils.mq.MessageUtil">
17         <constructor-arg index="0" ref="jmsFactory"/>
18         <constructor-arg index="1" value="10000"/>
19         <constructor-arg index="2">
20             <list>
21                 <value>dest1</value>
22                 <value>dest2</value>
23             </list>
24         </constructor-arg>
25         <property name="autoBackup" value="true"/>
26     </bean>
27 
28 </beans>
View Code

main函數:

    public static void main(String[] args) throws InterruptedException, JMSException {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-sender.xml");
        ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class, "jmsFactory");
        MessageUtil messageUtil = context.getBean(MessageUtil.class);
//        sendScheduleMessage(messageUtil);
//        deleteAllScheduleMessage(connectionFactory);
        deleteExpiredScheduleMessage(connectionFactory);
    }

參考文章:

Enhanced JMS Scheduler in ActiveMQ


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()