本文章的完整代碼可從我的github中下載:https://github.com/huangbowen521/SpringJMSSample.git
上一篇文章中介紹了如何安裝和運行ActiveMQ。這一章主要講述如何使用Spring JMS向ActiveMQ的Message Queue中發消息和讀消息。
首先需要在項目中引入依賴庫。
spring-core: 用于啟動Spring容器,加載bean。
spring-jms:使用Spring JMS提供的API。
activemq-all:使用ActiveMQ提供的API。
在本示例中我使用maven來導入相應的依賴庫。
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependencies>
<dependency>
<groupId> junit</groupId>
<artifactId> junit</artifactId>
<version> 4.11</version>
<scope> test</scope>
</dependency>
<dependency>
<groupId> org.apache.activemq</groupId>
<artifactId> activemq-all</artifactId>
<version> 5.9.0</version>
</dependency>
<dependency>
<groupId> org.springframework</groupId>
<artifactId> spring-jms</artifactId>
<version> 4.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId> org.springframework</groupId>
<artifactId> spring-core</artifactId>
<version> 4.0.2.RELEASE</version>
</dependency>
</dependencies>
接下來配置與ActiveMQ的連接,以及一個自定義的MessageSender。
springJMSConfiguration.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd" >
<bean class= "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
<property name= "location" >
<value> application.properties</value>
</property>
</bean>
<!-- Activemq connection factory -->
<bean id= "amqConnectionFactory" class= "org.apache.activemq.ActiveMQConnectionFactory" >
<constructor-arg index= "0" value= "${jms.broker.url}" />
</bean>
<!-- ConnectionFactory Definition -->
<bean id= "connectionFactory" class= "org.springframework.jms.connection.CachingConnectionFactory" >
<constructor-arg ref= "amqConnectionFactory" />
</bean>
<!-- Default Destination Queue Definition-->
<bean id= "defaultDestination" class= "org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg index= "0" value= "${jms.queue.name}" />
</bean>
<!-- JmsTemplate Definition -->
<bean id= "jmsTemplate" class= "org.springframework.jms.core.JmsTemplate" >
<property name= "connectionFactory" ref= "connectionFactory" />
<property name= "defaultDestination" ref= "defaultDestination" />
</bean>
<!-- Message Sender Definition -->
<bean id= "messageSender" class= "huangbowen.net.jms.MessageSender" >
<constructor-arg index= "0" ref= "jmsTemplate" />
</bean>
</beans>
在此配置文件中,我們配置了一個ActiveMQ的connection factory,使用的是ActiveMQ提供的ActiveMQConnectionFactory類。然后又配置了一個Spring JMS提供的CachingConnectionFactory。我們定義了一個ActiveMQQueue作為消息的接收Queue。并創建了一個JmsTemplate,使用了之前創建的ConnectionFactory和Message Queue作為參數。最后自定義了一個MessageSender,使用該JmsTemplate進行消息發送。
以下MessageSender的實現。
MessageSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package huangbowen . net . jms ;
import org.springframework.jms.core.JmsTemplate ;
public class MessageSender {
private final JmsTemplate jmsTemplate ;
public MessageSender ( final JmsTemplate jmsTemplate ) {
this . jmsTemplate = jmsTemplate ;
}
public void send ( final String text ) {
jmsTemplate . convertAndSend ( text );
}
}
這個MessageSender很簡單,就是通過jmsTemplate發送一個字符串信息。
我們還需要配置一個Listener來監聽和處理當前的Message Queue。
springJMSReceiver.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns= "http://www.springframework.org/schema/beans"
xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd" >
<!-- Message Receiver Definition -->
<bean id= "messageReceiver" class= "huangbowen.net.jms.MessageReceiver" >
</bean>
<bean class= "org.springframework.jms.listener.SimpleMessageListenerContainer" >
<property name= "connectionFactory" ref= "connectionFactory" />
<property name= "destinationName" value= "${jms.queue.name}" />
<property name= "messageListener" ref= "messageReceiver" />
</bean>
</beans>
在上述xml文件中,我們自定義了一個MessageListener,并且使用Spring提供的SimpleMessageListenerContainer作為Container。
以下是MessageLinser的具體實現。
MessageReceiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package huangbowen . net . jms ;
import javax.jms.* ;
public class MessageReceiver implements MessageListener {
public void onMessage ( Message message ) {
if ( message instanceof TextMessage ) {
TextMessage textMessage = ( TextMessage ) message ;
try {
String text = textMessage . getText ();
System . out . println ( String . format ( "Received: %s" , text ));
} catch ( JMSException e ) {
e . printStackTrace ();
}
}
}
}
這個MessageListener也相當的簡單,就是從Queue中讀取出消息以后輸出到當前控制臺中。
另外有關ActiveMQ的url和所使用的Message Queue的配置在application.properties文件中。
application.properties
1
2
jms.broker.url=tcp://localhost:61616
jms.queue.name=bar
好了,配置大功告成。如何演示那?我創建了兩個Main方法,一個用于發送消息到ActiveMQ的MessageQueue中,一個用于從MessageQueue中讀取消息。
SenderApp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package huangbowen . net ;
import huangbowen.net.jms.MessageSender ;
import org.springframework.context.ApplicationContext ;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.util.StringUtils ;
import java.io.BufferedReader ;
import java.io.IOException ;
import java.io.InputStreamReader ;
public class SenderApp
{
public static void main ( String [] args ) throws IOException {
MessageSender sender = getMessageSender ();
BufferedReader br = new BufferedReader ( new InputStreamReader ( System . in ));
String text = br . readLine ();
while (! StringUtils . isEmpty ( text )) {
System . out . println ( String . format ( "send message: %s" , text ));
sender . send ( text );
text = br . readLine ();
}
}
public static MessageSender getMessageSender () {
ApplicationContext context = new ClassPathXmlApplicationContext ( "springJMSConfiguration.xml" );
return ( MessageSender ) context . getBean ( "messageSender" );
}
}
ReceiverApp.java
1
2
3
4
5
6
7
8
9
10
package huangbowen . net ;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
public class ReceiverApp {
public static void main ( String [] args )
{
new ClassPathXmlApplicationContext ( "springJMSConfiguration.xml" , "springJMSReceiver.xml" );
}
}
OK,如果運行的話要先將ActiveMQ服務啟動起來(更多啟動方式參見我上篇文章)。
1
$: /usr/local/Cellar/activemq/5.8.0/libexec$ activemq start xbean:./conf/activemq-demo.xml
然后運行SenderApp中的Main方法,就可以在控制臺中輸入消息發送到ActiveMQ的Message Queue中了。運行ReceiverApp中的Main方法,則會從Queue中將消息讀出來,打印到控制臺。
這就是使用Spring JMS與ActiveMQ交互的一個簡單例子了。完整代碼可從https://github.com/huangbowen521/SpringJMSSample下載。