#情景引入
小白:起床起床起床起床,。,。,。??炱鸫瞺
我:怎么了又,,大驚小怪,,嚇到我了。
小白:我有事有事想找你,,十萬(wàn)火急呢~~
我:你能有什么事,?反正我不信。,。那你說(shuō)說(shuō)看~~
小白:就是我有兩個(gè)小表弟,,叫大白和二白,他們現(xiàn)在每天睡覺(jué)之前都要分別和我聊天,,讓我給他們講故事,,如果不講他們就不睡覺(jué),。但是,,如果一個(gè)個(gè)的跟他們輪流來(lái)說(shuō)的話,我就需要每天說(shuō)兩遍,,而且我還要找準(zhǔn)他們的時(shí)間點(diǎn),,這個(gè)有時(shí)候我有事情都無(wú)法實(shí)現(xiàn)這個(gè)問(wèn)題,他們就會(huì)很生氣,。,。。
我:這不是挺好的嘛,,小孩子就是愛(ài)聽(tīng)故事的呀,。。,。
小白:我也愿意講,,但是時(shí)間這個(gè)不是很好控制,有沒(méi)有類(lèi)似,,比如我可以之前就描述好了,,然后定點(diǎn)給他們兩個(gè)一起發(fā)消息,而可以拋開(kāi)時(shí)間和其他因素的影響呢,?
我:這個(gè)嘛,,很簡(jiǎn)單呀,你可以讓他們關(guān)注你的一個(gè)公眾號(hào),,這樣你再定時(shí)的推送給他們故事不就可以了嘛,。?;蛘?,你可以拉他們進(jìn)你的一個(gè)群這樣,就方便了呀~
小白:這樣是可以,,但是如果以后還有小表妹要聽(tīng)我講,,我就要如此反復(fù)的做,。。感謝好麻煩好麻煩,。,。。
我:emmm,,我理解你的意思,,你就想實(shí)現(xiàn)一種很多人都能夠進(jìn)行類(lèi)似一種消息推送的方式嘛。,。,。
小白:對(duì)的對(duì)的。,。就是這樣一種,,,,,我記得我們?cè)诩夹g(shù)方面好像也有一種類(lèi)似的技術(shù),,這個(gè)叫做什么去了呢?
我:這就是消息中間件,,一種生產(chǎn)者和消費(fèi)者的關(guān)系,。
小白:我也想學(xué)我也想學(xué),,,你快給我講講,,給我講講唄。,。
我:真拿你沒(méi)辦法,,好吧。,。,。下面我就給你講一下這方面的知識(shí)。
#情景分析
其實(shí),,小白的這個(gè)問(wèn)題,,是一種比較普遍的問(wèn)題。既然我們作為技術(shù)人員,,當(dāng)然我們就要從技術(shù)成分去分析如何解決了,。這里面其實(shí)就是包含著一種消息中間件的技術(shù)。它也是最近技術(shù)層面用得非常非常多的,,這也是非常值得我們進(jìn)行學(xué)習(xí),。。這在如今的秒殺系統(tǒng),,推薦系統(tǒng)等等,,都有廣泛的應(yīng)用,。。所以,,這章我就主要來(lái)跟大家說(shuō)說(shuō)這方面的知識(shí),。
#基本概念的引導(dǎo)
本模塊主要講解關(guān)于消息中間件的相關(guān)基礎(chǔ)知識(shí),也是方便我們后面的學(xué)習(xí),。
###什么是中間件,?
非操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,,不是直接給最終用戶使用,,不能直接給用戶帶來(lái)價(jià)值的軟件,我們就可以稱(chēng)為中間件(比如Dubbo,,Tomcat,,Jetty,Jboss都是屬于的),。
###什么是消息中間件,?
百度百科解釋?zhuān)合⒅虚g件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成,。通過(guò)提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信,。
關(guān)鍵點(diǎn):關(guān)注于數(shù)據(jù)的發(fā)送和接受,,利用高效可靠的異步消息機(jī)制傳遞機(jī)制集成分布式系統(tǒng)。
先簡(jiǎn)單的用下面這個(gè)圖說(shuō)明:
###為什么要使用消息中間件
舉幾個(gè)例子,,我想你就會(huì)明白了,。(其實(shí)使用消息中間件主要就是為了解耦合和異步兩個(gè)作用)
1:微博,都用過(guò)吧,。那么,,當(dāng)我們新關(guān)注一個(gè)用戶,那么系統(tǒng)會(huì)相應(yīng)的推送消息給我們,,并且還做了很多關(guān)于我們關(guān)注的處理,。這就是消息中間件的異步。
2:秒殺系統(tǒng),。100件商品,,幾十萬(wàn)個(gè)人在搶?zhuān)沁@個(gè)怎么弄呢?總不能就把服務(wù)器給宕機(jī)了吧,。那么就可以把用戶的請(qǐng)求進(jìn)行緩存,,然后再異步處理。
3:系統(tǒng)A給系統(tǒng)B進(jìn)行通信,,而系統(tǒng)B需要對(duì)A的消息進(jìn)行相應(yīng)處理之后才能給A反饋,,這時(shí)候,,總不能讓A就傻傻等著吧。那么,,這就是異步的功能,。
###什么是JMS?
Java消息服務(wù)(Java Message Service)應(yīng)用程序接口是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,,用于在兩個(gè)應(yīng)用程序之間,,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信,。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
總結(jié)起來(lái)說(shuō)就是:Java對(duì)于應(yīng)用程序之間進(jìn)行信息交互的API(而且是異步),。
里面有下面的概念需要理解,,對(duì)后續(xù)有幫助:
-
提供者:實(shí)現(xiàn)JMS的消息服務(wù)中間件服務(wù)器。
-
客戶端:發(fā)送或接受消息的應(yīng)用,。
-
生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端,。
-
消費(fèi)者/訂閱者:接受并處理消息的客戶端。
-
消息:應(yīng)用程序之間傳遞的數(shù)據(jù),。
-
消息模式:在客戶端之間傳遞消息的模式,,JMS主要是隊(duì)列模式和主體模式。
-
隊(duì)列模式特點(diǎn):
(1)客戶端包括生產(chǎn)者和消費(fèi)者,。
(2)隊(duì)列中的一個(gè)消息只能被一個(gè)消費(fèi)者使用,。
(3)消費(fèi)者可以隨時(shí)取消息。
-
主體模式特點(diǎn):
(1)客戶端包括發(fā)布者和訂閱者,。
(2)主題中的消息可以被所有訂閱者消費(fèi),。
(3)消費(fèi)者不能消費(fèi)訂閱之前發(fā)送的消息。
###什么是AMQP,?
AMQP,,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,,并不受客戶端/中間件不同產(chǎn)品,不同的開(kāi)發(fā)語(yǔ)言等條件的限制,。
簡(jiǎn)單點(diǎn)說(shuō):就是對(duì)于消息中間件所接受的消息傳輸層的協(xié)議(不懂傳輸層,,那么就需要多看看計(jì)算機(jī)網(wǎng)絡(luò)相關(guān)知識(shí)了,OSI的層次劃分),,只有這樣才能保證客戶端和消息中間件能夠進(jìn)行交互(換位思考:HTTP和HTTPS甚至說(shuō)是TCP/IP與UDP協(xié)議都要的道理),。
emmm,比較一下JMS和AMQP的不同吧,。,。
-
JMS是定義與Java,,而AMQP是一種傳輸層協(xié)議。
-
JMS是屬于Java的API,,而AMQP是跨語(yǔ)言的,。
-
JMS消息類(lèi)型只有兩種(主題和隊(duì)列,后續(xù)會(huì)說(shuō)),,而AMQP是有五種,。
-
JMS主要就是針對(duì)Java的開(kāi)發(fā)的Client,而AMQP是面向消息,,隊(duì)列,,路由。
###什么是ActiveMQ呢,?
ActiveMQ 是Apache出品,,最流行的,能力強(qiáng)勁的開(kāi)源消息總線,。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),,盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位,。
簡(jiǎn)單點(diǎn)說(shuō):不就是為了實(shí)現(xiàn)我上述所想要的需求嘛,。然后它就是一種實(shí)現(xiàn)的方式。就比如,,Tomcat是什么,?不就是為了實(shí)現(xiàn)一種client與服務(wù)器之間的交互的一種產(chǎn)品嘛。,。所以,不需要死記概念,,自己理解就好,。
#ActiveMQ的安裝
##環(huán)境:Windows
步驟:
(1)登錄到ActiveMQ的官網(wǎng),下載安裝包,。http://activemq./activemq-5154-release.html
(2)下載Zip文件
(3)解壓Zip文件,,目錄如下
(4)啟動(dòng)ActiveMQ服務(wù)(注意:要右鍵以管理員身份進(jìn)行運(yùn)行)
注意:有兩種方式,第一種就是類(lèi)似tomcat啟動(dòng),,那么啟動(dòng)圖會(huì)一直顯示,。
而第二種的話,就是把這個(gè)ActiveMQ注冊(cè)到服務(wù)列表中,,這樣更方便我們進(jìn)行操作,。(推薦使用這種)
(5)登錄,驗(yàn)證是否啟動(dòng)成功
(6)進(jìn)入管理頁(yè)面
OK,,進(jìn)入之后就可以看我們的管理頁(yè)面啦,。,。。是不是很簡(jiǎn)單呢,?
##環(huán)境:Linux
步驟:(多余的我就不多說(shuō)了,。。,。請(qǐng)看windows的步驟)
(1)同樣需要下載對(duì)應(yīng)的文件,。后綴為tar.gz的這樣的。其實(shí)可以直接通過(guò)下面的這個(gè)命令下載,,快速一點(diǎn),,免得要移動(dòng)到Linux(注意:如果是通過(guò)ssh連接的方式的話)。
wget https://mirrors.tuna./apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
(2)然后解壓下載的文件
(3)同樣進(jìn)入相對(duì)應(yīng)的目錄,,運(yùn)行
./activemq start
(4)然后再訪問(wèn)相同的地址就可以看到啦,。(具體看windows安裝步驟)
#ActiveMQ的使用(基于Maven)
首先要再回頭看看JMS中的一些關(guān)鍵接口。
- ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠,。
- Connection:代表了應(yīng)用程序和服務(wù)之間的連接通路,。
- Destination:指消息發(fā)布的地點(diǎn),包括隊(duì)列模式和主體模式,。
- Session:表示一個(gè)單線程的上下文,,用于發(fā)送和接受消息。
- MessageConsumer:由會(huì)話創(chuàng)建,,用于接受發(fā)送到目的的消息,。
- MessageProducer:由會(huì)話創(chuàng)建,用于發(fā)送消息,。
- Message:是在消費(fèi)者和生產(chǎn)者之間傳遞的對(duì)象,,消息頭,一組消息屬性,,和一個(gè)消息體,。
環(huán)境:IDEA
步驟:
- 使用IDEA創(chuàng)建一個(gè)Maven項(xiàng)目,最簡(jiǎn)單的骨架即可(quick)
- 導(dǎo)入ActiveMq的依賴
<!--添加activemq的依賴-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
###情形一:隊(duì)列模型的消息
3. 編寫(xiě)生產(chǎn)者代碼(使用隊(duì)列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的創(chuàng)建類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("我發(fā)送message:" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
- 查看是否消息產(chǎn)生成功
- 編寫(xiě)消費(fèi)者代碼(消費(fèi)隊(duì)列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消費(fèi)者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
-
查看是否進(jìn)行了消費(fèi)
**備注:**我上面進(jìn)行的是隊(duì)列模式的消息,,而且進(jìn)行的都是單個(gè)消費(fèi)者,,那如果我換成同時(shí)有兩個(gè)消費(fèi)者消費(fèi)生產(chǎn)者的消息會(huì)怎么樣呢?(我們只需要運(yùn)行兩個(gè)消費(fèi)者就可以啦,。當(dāng)然,,要保證生產(chǎn)者是產(chǎn)生了消息的哦~~~~否則,拿什么消費(fèi)呢~)
一個(gè)生產(chǎn)者,,兩個(gè)消費(fèi)者的情況如下:
切記:先運(yùn)行兩個(gè)消費(fèi)者,,然后再運(yùn)行生產(chǎn)者代碼:
結(jié)果如下:
其實(shí),這就是解釋了,我之前說(shuō)的,,隊(duì)列模式的消息,,是只會(huì)被一個(gè)消費(fèi)者所使用的,而不會(huì)被共享,,這也就是和主題模型的差別哦~~~哈哈
###情形二:主題模型的消息
前面的步驟都一樣,,只是生產(chǎn)者和消費(fèi)者的代碼有點(diǎn)區(qū)別:
-
編寫(xiě)生產(chǎn)者(這個(gè)和隊(duì)列模型其實(shí)很像,稍微修改就可以)
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:48 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicProducer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的主題名稱(chēng)
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createTopic(TOPIC_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1; i <= 100; i++) {
TextMessage message = session.createTextMessage("當(dāng)前message是(主題模型):" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
- 查看生產(chǎn)者的消息
- 編寫(xiě)消費(fèi)者
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:50 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicConsumer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createTopic(TOPIC_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
- 查看是否消費(fèi)成功
然而,,我們運(yùn)行消費(fèi)者代碼,,發(fā)現(xiàn)怎么沒(méi)有消息消費(fèi)呢?,?,??,?,??,?,?
其實(shí),這就是主題模型的一個(gè)特點(diǎn),,如果消費(fèi)者是在生產(chǎn)者產(chǎn)生消息之后來(lái)的,,那么是不會(huì)對(duì)之前的消息進(jìn)行消費(fèi)的哦。,?!,F(xiàn)在知道它們的區(qū)別在哪了吧,。
如果,,現(xiàn)在是兩個(gè)消費(fèi)者和一個(gè)生產(chǎn)者的主題模型又是怎么的結(jié)果呢?
哎喲,。,。。,。這種情況消費(fèi)者都各自消費(fèi)了所有的生產(chǎn)者的消息耶。,。,。。,。這就是共享性消息的主題模式,,這就是和隊(duì)列模型的區(qū)別,,,,大家好好的對(duì)比哦~~
#ActiveMQ使用(基于Spring)
步驟:
- 創(chuàng)建一個(gè)Maven項(xiàng)目(基于最簡(jiǎn)單的quick骨架即可)
- 導(dǎo)入Spring和ActiveMQ的相關(guān)依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hnu.scw</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<name>activemq</name>
<!-- FIXME change it to the project's website -->
<url>http://www.</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--添加activemq的依賴-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!--spring整合activemq所需要的依賴-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven./ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
- 編寫(xiě)生產(chǎn)者的配置文件.xml,,取名為producer.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www./schema/beans"
xmlns:xsi="http://www./2001/XMLSchema-instance"
xmlns:context="http://www./schema/context"
xsi:schemaLocation="http://www./schema/beans
http://www./schema/beans/spring-beans.xsd
http://www./schema/context
http://www./schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的連接工廠-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms為我們提供的連接池 獲取一個(gè)連接工廠-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 點(diǎn)對(duì)點(diǎn)的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- jms模板 用于進(jìn)行消息發(fā)送-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
- 編寫(xiě)生產(chǎn)者的接口
package com.hnu.scw.spring;
/**
* @ Author :scw
* @ Date :Created in 下午 12:19 2018/7/14 0014
* @ Description:生產(chǎn)者的接口
* @ Modified By:
* @Version: $version$
*/
public interface ProduceService {
void sendMessage(String msg);
}
- 編寫(xiě)生產(chǎn)者的實(shí)現(xiàn)
package com.hnu.scw.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 下午 2:21 2018/7/15 0015
* @ Description:生產(chǎn)者的實(shí)現(xiàn)類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class ProduceServiceImpl implements ProduceService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
/**
* 發(fā)送消息
* @param msg
*/
@Override
public void sendMessage(final String msg) {
jmsTemplate.send(destination , new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(msg);
return textMessage;
}
});
System.out.println("現(xiàn)在發(fā)送的消息為: " + msg);
}
}
- 將生產(chǎn)者的類(lèi)添加到上述的配置文件中
<!--注入我們的生產(chǎn)者-->
<bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
- 編寫(xiě)生產(chǎn)者的測(cè)試類(lèi)
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 2:27 2018/7/15 0015
* @ Description:生產(chǎn)者的測(cè)試
* @ Modified By:
* @Version: $version$
*/
public class ProducerTest {
public static void main(String[] args){
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml");
ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class);
//進(jìn)行發(fā)送消息
for (int i = 0; i < 100 ; i++) {
bean.sendMessage("test" + i);
}
//當(dāng)消息發(fā)送完后,關(guān)閉容器
classPathXmlApplicationContext.close();
}
}
- 運(yùn)行測(cè)試類(lèi),,查看生產(chǎn)者是否產(chǎn)生消息成功
通過(guò)上述的界面,,就可以看到自己配置的隊(duì)列模式的消息產(chǎn)生成功。
- 編寫(xiě)消費(fèi)者的消息監(jiān)聽(tīng)類(lèi)
- 編寫(xiě)消費(fèi)者的配置文件,,命名為consumer.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www./schema/beans"
xmlns:xsi="http://www./2001/XMLSchema-instance"
xmlns:context="http://www./schema/context"
xsi:schemaLocation="http://www./schema/beans
http://www./schema/beans/spring-beans.xsd
http://www./schema/context
http://www./schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的連接工廠-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms為我們提供的連接池 獲取一個(gè)連接工廠-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 點(diǎn)對(duì)點(diǎn)的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- 配置消息監(jiān)聽(tīng)器-->
<bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/>
<!--配置消息容器-->
<bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--配置連接工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--配置監(jiān)聽(tīng)的隊(duì)列-->
<property name="destination" ref="queueDestination"/>
<!--配置消息監(jiān)聽(tīng)器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
- 消息消費(fèi)者ComsumerMessageListener類(lèi)代碼
package com.hnu.scw.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @ Author :scw
* @ Date :Created in 下午 3:06 2018/7/15 0015
* @ Description:消息的監(jiān)聽(tīng)者,,用于處理消息
* @ Modified By:
* @Version: $version$
*/
public class ComsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 編寫(xiě)測(cè)試文件,測(cè)試消費(fèi)者消費(fèi)消息是否成功
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 3:13 2018/7/15 0015
* @ Description:消費(fèi)者的測(cè)試
* @ Modified By:
* @Version: $version$
*/
public class ConsumerTest {
public static void main(String[] args){
//啟動(dòng)消費(fèi)者
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml");
}
}
- 查看ActiveMQ網(wǎng)站具體消息情況
- ActiveMQ的隊(duì)列模型就大功告成啦,。,。。,。,。。so easy?。,。?br>
備注:上面都是進(jìn)行的ActiveMQ的隊(duì)列模型的配置,,那么我們?nèi)绻脒M(jìn)行主題模型的又是如何進(jìn)行操作呢,?其實(shí)也很簡(jiǎn)單,只需要修改生產(chǎn)者的xml文件里面的隊(duì)列即可,。比如如下代碼:
<!-- 消息目的地 (主題模式)-->
<!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!–配置隊(duì)列模型的消息名稱(chēng)–>
<constructor-arg value="SpringActiveMQMsgTopic"/>
</bean>-->
將上面的代碼替換之前的就可以了,。。,。
總結(jié):總的來(lái)說(shuō),,基于Spring來(lái)使用消息隊(duì)列還是非常方便的,這比我們正常進(jìn)行JMS規(guī)范操作要簡(jiǎn)單很多,,畢竟很多對(duì)象都是通過(guò)Spring的IOC進(jìn)行容器管理了,,所以,值得推薦使用哦~~~
#ActiveMQ的集群
###為什么要進(jìn)行集群呢,?
原因一:實(shí)現(xiàn)高可用:以排除單點(diǎn)故障所引起的服務(wù)終端,。
原因二:實(shí)現(xiàn)負(fù)載均衡:以提升效率為更多的客戶進(jìn)行服務(wù)。
###集群的方式有哪些,?
方式一:客戶端集群:多個(gè)客戶端消費(fèi)同一個(gè)隊(duì)列,。
方式二:Broker clusters:多個(gè)Broker之間同步消息。(實(shí)現(xiàn)負(fù)載均衡)
這個(gè)的實(shí)現(xiàn)原理主要是通過(guò)網(wǎng)絡(luò)連接器來(lái)進(jìn)行,。
網(wǎng)絡(luò)連接器:用于配置ActiveMQ服務(wù)器與服務(wù)器之間的網(wǎng)絡(luò)通訊方式,,用于服務(wù)器透析消息。主要分為靜態(tài)連接和動(dòng)態(tài)連接。
方式三:Master Slave :實(shí)現(xiàn)高可用,。
這種方式的話,,可以聯(lián)想到Mysql的主從配置和Zookeeper的負(fù)載均衡的主競(jìng)爭(zhēng)關(guān)系master。
我們?cè)趯?shí)際的開(kāi)發(fā)中,,一般都是將方式二和方式三進(jìn)行集成,,從而實(shí)現(xiàn)高可用和負(fù)載均衡。下面的話,,我也就這樣的配置思想來(lái)進(jìn)行講解:(通過(guò)三臺(tái)服務(wù)器來(lái)模擬消息集群的實(shí)現(xiàn))
其中的NodeB和NodeC就是一張Master/slave的關(guān)系,。都可以成為主服務(wù)器。(只要它們某一個(gè)宕機(jī),,那么就會(huì)其余的一臺(tái)就進(jìn)行繼續(xù)服務(wù))
###搭建步驟(基于Windows環(huán)境,,而Linux環(huán)境也是一樣的操作)
三臺(tái)服務(wù)器的大體功能和描述:
由于自己沒(méi)有三臺(tái)服務(wù)器,所以就用自己的一臺(tái)電腦來(lái)模擬三臺(tái)消息服務(wù)器,,其實(shí)這個(gè)就是假設(shè)有三個(gè)不同ActiveMQ消息服務(wù)器了,。
- 復(fù)制三個(gè)ActiveMQ的服務(wù)配置到一個(gè)公共目錄
- 修改activeMQA的配置文件
只需要在activemq.xml添加如下內(nèi)容:
<networkConnectors>
<networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
</networkConnectors>
- 修改ActiveMQB的配置文件
(1)首先在activemq,xml中添加如下內(nèi)容:
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面這個(gè)標(biāo)簽的內(nèi)容 , 作為B和C的共享文件,,目錄就是自己之前創(chuàng)建的一個(gè)文件(可以回看上面的整個(gè)結(jié)構(gòu))-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml內(nèi)容,,修改服務(wù)器的服務(wù)端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
- 修改ActiveMQC的配置文件(其實(shí)類(lèi)似和B一樣,只是服務(wù)端口不一樣)
(1)修改activemq.xml中的內(nèi)容
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面這個(gè)標(biāo)簽的內(nèi)容 ,, 作為B和C的共享文件,,目錄就是自己之前創(chuàng)建的一個(gè)文件(可以回看上面的整個(gè)結(jié)構(gòu))-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml中的內(nèi)容
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
- 集群搭建完成~~~~
集群測(cè)試(基于IDEA編輯器+Maven)
步驟:
(1)創(chuàng)建Maven項(xiàng)目
(2)導(dǎo)入依賴
<!--添加activemq的依賴-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
(3)編寫(xiě)生產(chǎn)者代碼
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的創(chuàng)建類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//通過(guò)集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動(dòng)態(tài)轉(zhuǎn)移,當(dāng)某個(gè)服務(wù)器宕機(jī),,
// 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("當(dāng)前message是:" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
(4)編寫(xiě)消費(fèi)者代碼
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消費(fèi)者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//通過(guò)集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動(dòng)態(tài)轉(zhuǎn)移,,當(dāng)某個(gè)服務(wù)器宕機(jī),
// 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
(5)進(jìn)行查看各自的服務(wù)器的消息隊(duì)列的情況,。
- 首先,,是要確保三個(gè)ActiveMQ服務(wù)器都進(jìn)行打開(kāi)。分析:當(dāng)三個(gè)都服務(wù)都運(yùn)行之后,,我們從瀏覽器運(yùn)行各自的地址,,會(huì)發(fā)現(xiàn):
比如:我這里的三個(gè)服務(wù)的地址分別如下:
- http://127.0.0.1:8161/
- http://127.0.0.1:8162/
- http://127.0.0.1:8163/
###重點(diǎn)
為什么前面兩個(gè)都可以訪問(wèn),而第三個(gè)不可以呢,?(同樣也是按照我的這樣的服務(wù)器打開(kāi)方式哦,。先打開(kāi)的服務(wù)器A,接著B(niǎo),,最后C)但是,,運(yùn)行的時(shí)候,提示都成功了呀,。。為什么為什么?,?,?
分析:其實(shí)很簡(jiǎn)單,我說(shuō)過(guò)B和C是一種master/slave的方式,,當(dāng)B運(yùn)行之后就獲得了master的權(quán)限,,那么C服務(wù)是可以看到是一種監(jiān)聽(tīng)的狀態(tài),只有當(dāng)B宕機(jī)之后,,才有可能獲取master的資源權(quán)限,,所以,這時(shí)候C的地址當(dāng)然就無(wú)法訪問(wèn)啦,。這就是負(fù)載均衡的一種主/從服務(wù)的結(jié)構(gòu),。當(dāng)然,你可以試著先打開(kāi)C,,再打開(kāi)B,,這時(shí)候效果就反過(guò)來(lái)了。歡迎嘗試哦~~~
- 再運(yùn)行MessageProducer的類(lèi),,用于產(chǎn)生消息,。這時(shí)候,大家可以去查看每個(gè)服務(wù)器的地址,,來(lái)觀察消息的產(chǎn)生情況,。我的如下:
我的消息是產(chǎn)生在服務(wù)器B的里面啦。,。,。。,。,。
- 再運(yùn)行MessageConsumer的類(lèi),用于消費(fèi)消息,。這時(shí)候,,同樣可以去查看每個(gè)服務(wù)器的地址中的消息隊(duì)列的情況,來(lái)觀察消息的消費(fèi)情況,。我的如下:
- 如果,,我們?cè)谏a(chǎn)者產(chǎn)生了消息之后,服務(wù)器B突然宕機(jī)了怎么辦怎么辦,?,?
分析:其實(shí),這時(shí)候服務(wù)器C就一樣有消息保存進(jìn)行同步了,。,。是不是這樣就是一種高可用的架構(gòu)了呢,??,?,?大家,可以試試哦,。,。把B服務(wù)器關(guān)掉,再去訪問(wèn)服務(wù)器C的地址,,就發(fā)現(xiàn)如下的結(jié)果,。
這時(shí)候服務(wù)器C就作為了master,所以,,類(lèi)似zookeeper就是這樣的一種方式的哦,。~
###總結(jié)
好了,對(duì)于集群方面的簡(jiǎn)單使用就到這里了,。其實(shí)已經(jīng)可以根據(jù)這個(gè)進(jìn)行擴(kuò)展了,,所以,小伙伴要好好理解這里面的過(guò)程和作用,,這樣才能夠?qū)W以致用,。。,。
#其他的消息中間件
其實(shí),,類(lèi)似ActiveMQ這樣的消息中間件,用得比較多的還有就是RabbitMQ和Kafka,。它們?nèi)吒髯杂懈髯缘膬?yōu)勢(shì),。大家可以百度進(jìn)行了解,我就不進(jìn)行多說(shuō)了,。后面我會(huì)同樣把這兩種消息中間件的使用進(jìn)行詳細(xì)的講解,,歡迎大家的關(guān)注哦~總的來(lái)說(shuō),只有適合的場(chǎng)景對(duì)應(yīng)的消息中間件才能發(fā)揮最大的作用,,沒(méi)有一種是只有好處而沒(méi)有壞處的~
#總結(jié)
- 主要是對(duì)消息中間件的基礎(chǔ)知識(shí)進(jìn)行講解,。
- 主要講解ActiveMQ的使用
- 主要講解了關(guān)于ActiveMQ的集群的搭建
- 稍微提到了類(lèi)似ActiveMQ消息中間件的其他中間件
- 我所講述的內(nèi)容,夠大家進(jìn)行入門(mén)了,,如果要進(jìn)行深入的了解還是需要慢慢的去熟悉和學(xué)習(xí)的,,而且消息中間件是非常重要的一個(gè)技術(shù),希望大家去好好的了解,。
- 最后,,感謝各位的閱讀哦~~~~
|