Spring+ActiveMQ+Mysql 配置JMS 一、準備一個可以運行的Spring環(huán)境 二,、下載ActiveMQ (下載地址) 2.1 先確保ActiveMQ運行正常,,直接運行 安裝目錄\bin\activemq.bat即可,, 注意:如果要以服務方式運行的話,,可以使用ActiveMQ 提供的工具 安裝目錄\bin\win32\InstallService.bat 確保以管理員方式運行 可以打開鏈接,, (http://localhost:8161/admin)查看是否安裝成功 三,、試用 單獨編寫消息發(fā)送者和消息接受以測試相應 消息發(fā)送 public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個發(fā)送或接收消息的線程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值Queue.Name是Query的名字 Destination destination = session.createQueue("[color=red]Queue.Name[/color]"); // MessageProducer:消息生產(chǎn)者 MessageProducer producer = session.createProducer(destination); // 設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 發(fā)送一條消息 sendMsg(session, producer); session.commit(); connection.close(); } /** * 在指定的會話上,通過指定的消息生產(chǎn)者發(fā)出一條消息 * * @param session 消息會話 * @param producer 消息生產(chǎn)者 */ public static void sendMsg(Session session, MessageProducer producer) throws JMSException { // 創(chuàng)建一條文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ,!"); // 通過消息生產(chǎn)者發(fā)出消息 producer.send(message); System.out.println(""); } 消息接收 public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個發(fā)送或接收消息的線程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 Destination destination = session.createQueue("Queue.Name"); // 消費者,,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while(true) { TextMessage message = (TextMessage) consumer.receive(1000); if(null != message) System.out.println("收到消息:" + message.getText()); else break; } session.close(); connection.close(); } 開啟你的ActiveMQ服務器,,測試一下吧,。發(fā)送一個消息,,然后看看接收到的成果 四,、 Spring 注入 spring application.xml 文件配置 <!-- 配置JMS消息發(fā)送 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0"> <value>Queue.Name</value> </constructor-arg> </bean> <bean id="sender" class="demo.JmsQueueSender"> <property name="jmsTemplate" ref="myJmsTemplate"></property> </bean> <bean id="receive" class="demo.JmsQueueReceiver"></bean> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"></property> <property name="messageListener" ref="receive"></property> <property name="destination" ref="destination" /> </bean> <!-- 配置JMS消息發(fā)送完成 --> 注意這里需要幾個包 ,,activeio-core-3.1.2.jar,activemq-all-5.5.0.jar,,activemq-pool-5.5.0.jar,,commons-pool-1.5.6.jar 剩下的就是在你的程序里面添加相應的消息發(fā)送和接收程序了 sender @Component public class JmsQueueSender { private JmsTemplate jmsTemplate; public void setConnectionFactory(ConnectionFactory cf) { this.jmsTemplate = new JmsTemplate(cf); } public void simpleSend() { jmsTemplate.convertAndSend("Queue.Name", "test!!!"); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } } receiver @Component public class JmsQueueReceiver implements MessageListener { @Override public void onMessage(Message message) { if(message instanceof TextMessage) { final TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch(final JMSException e) { e.printStackTrace(); } } } } 五,、配置ActiveMQ以數(shù)據(jù)庫的方式存儲消息 ActiveMQ安裝目錄\conf\activemq.xml 找到 <broker>標簽中的內(nèi)容 <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> 注釋掉以上內(nèi)容,添加自己的數(shù)據(jù)庫配置 <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> 配置以Mysql的方式保存消息 在<broker>標簽以外的地方添加數(shù)據(jù)源 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean> 將Mysql的包加到ActiveMQ的啟動Lib下 在Mysql數(shù)據(jù)中新建數(shù)據(jù)庫 activemq ,,ActiveMQ在啟動的時候會自動建表,。 OK 。,。,。。 重新啟動服務,。 這樣消息的發(fā)送者的消息將被保存到Mysql數(shù)據(jù)庫,,同時消息消耗者每讀取一條消息。數(shù)據(jù)庫中的消息也會相應的刪除,。 |
|