文章出處

本文章的完整代碼可從我的github中下載:https://github.com/huangbowen521/SpringJMSSample.git

上一篇文章中介紹了如何安裝和運行ActiveMQ。這一章主要講述如何使用Spring JMS向ActiveMQ的Message Queue中發消息和讀消息。

首先需要在項目中引入依賴庫。

  • spring-core: 用于啟動Spring容器,加載bean。

  • spring-jms:使用Spring JMS提供的API。

  • activemq-all:使用ActiveMQ提供的API。

在本示例中我使用maven來導入相應的依賴庫。

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.9.0</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
          <version>4.0.2.RELEASE</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-core</artifactId>
          <version>4.0.2.RELEASE</version>
      </dependency>
  </dependencies>

接下來配置與ActiveMQ的連接,以及一個自定義的MessageSender。

springJMSConfiguration.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="location">
            <value>application.properties</value>
        </property>
    </bean>

    <!-- Activemq connection factory -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="${jms.broker.url}"/>
    </bean>

    <!-- ConnectionFactory Definition -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
    </bean>

    <!--  Default Destination Queue Definition-->
    <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${jms.queue.name}"/>
    </bean>

    <!-- JmsTemplate Definition -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="defaultDestination"/>
    </bean>

    <!-- Message Sender Definition -->
    <bean id="messageSender" class="huangbowen.net.jms.MessageSender">
        <constructor-arg index="0" ref="jmsTemplate"/>
    </bean>
</beans>

在此配置文件中,我們配置了一個ActiveMQ的connection factory,使用的是ActiveMQ提供的ActiveMQConnectionFactory類。然后又配置了一個Spring JMS提供的CachingConnectionFactory。我們定義了一個ActiveMQQueue作為消息的接收Queue。并創建了一個JmsTemplate,使用了之前創建的ConnectionFactory和Message Queue作為參數。最后自定義了一個MessageSender,使用該JmsTemplate進行消息發送。

以下MessageSender的實現。

MessageSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package huangbowen.net.jms;

import org.springframework.jms.core.JmsTemplate;

public class MessageSender {

    private final JmsTemplate jmsTemplate;

    public MessageSender(final JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void send(final String text) {
        jmsTemplate.convertAndSend(text);
    }
}

這個MessageSender很簡單,就是通過jmsTemplate發送一個字符串信息。

我們還需要配置一個Listener來監聽和處理當前的Message Queue。

springJMSReceiver.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- Message Receiver Definition -->
    <bean id="messageReceiver" class="huangbowen.net.jms.MessageReceiver">
    </bean>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="${jms.queue.name}"/>
        <property name="messageListener" ref="messageReceiver"/>
    </bean>

</beans>

在上述xml文件中,我們自定義了一個MessageListener,并且使用Spring提供的SimpleMessageListenerContainer作為Container。

以下是MessageLinser的具體實現。

MessageReceiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package huangbowen.net.jms;

import javax.jms.*;

public class MessageReceiver implements MessageListener {

    public void onMessage(Message message) {
        if(message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println(String.format("Received: %s",text));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

這個MessageListener也相當的簡單,就是從Queue中讀取出消息以后輸出到當前控制臺中。

另外有關ActiveMQ的url和所使用的Message Queue的配置在application.properties文件中。

application.properties
1
2
jms.broker.url=tcp://localhost:61616
jms.queue.name=bar

好了,配置大功告成。如何演示那?我創建了兩個Main方法,一個用于發送消息到ActiveMQ的MessageQueue中,一個用于從MessageQueue中讀取消息。

SenderApp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package huangbowen.net;

import huangbowen.net.jms.MessageSender;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.util.StringUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class SenderApp
{
    public static void main( String[] args ) throws IOException {
        MessageSender sender = getMessageSender();
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        String text = br.readLine();

        while (!StringUtils.isEmpty(text)) {
            System.out.println(String.format("send message: %s", text));
            sender.send(text);
            text = br.readLine();
        }
    }

    public static MessageSender getMessageSender() {
        ApplicationContext context = new ClassPathXmlApplicationContext("springJMSConfiguration.xml");
       return (MessageSender) context.getBean("messageSender");
    }
}
ReceiverApp.java
1
2
3
4
5
6
7
8
9
10
package huangbowen.net;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ReceiverApp {
    public static void main( String[] args )
    {
        new ClassPathXmlApplicationContext("springJMSConfiguration.xml", "springJMSReceiver.xml");
    }
}

OK,如果運行的話要先將ActiveMQ服務啟動起來(更多啟動方式參見我上篇文章)。

1
$:/usr/local/Cellar/activemq/5.8.0/libexec$ activemq start xbean:./conf/activemq-demo.xml

然后運行SenderApp中的Main方法,就可以在控制臺中輸入消息發送到ActiveMQ的Message Queue中了。運行ReceiverApp中的Main方法,則會從Queue中將消息讀出來,打印到控制臺。

這就是使用Spring JMS與ActiveMQ交互的一個簡單例子了。完整代碼可從https://github.com/huangbowen521/SpringJMSSample下載。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()