簡(jiǎn)介
在前面一篇文章里討論過(guò)幾種應(yīng)用系統(tǒng)集成的方式,,發(fā)現(xiàn)實(shí)際上面向消息隊(duì)列的集成方案算是一個(gè)總體比較合理的選擇,。這里,,我們先針對(duì)具體的一個(gè)消息隊(duì)列Activemq的基本通信方式進(jìn)行探討,。activemq是JMS消息通信規(guī)范的一個(gè)實(shí)現(xiàn)??偟膩?lái)說(shuō),消息規(guī)范里面定義最常見(jiàn)的幾種消息通信模式主要有發(fā)布-訂閱,、點(diǎn)對(duì)點(diǎn)這兩種,。另外,,通過(guò)結(jié)合這些模式的具體應(yīng)用,我們?cè)谔幚砟承?yīng)用場(chǎng)景的時(shí)候也衍生出來(lái)了一種請(qǐng)求應(yīng)答的模式,。下面,我們針對(duì)這幾種方式一一討論一下,。
基礎(chǔ)流程
在討論具體方式的時(shí)候,,我們先看看使用activemq需要啟動(dòng)服務(wù)的主要過(guò)程,。
按照J(rèn)MS的規(guī)范,,我們首先需要獲得一個(gè)JMS connection factory.,,通過(guò)這個(gè)connection factory來(lái)創(chuàng)建connection.在這個(gè)基礎(chǔ)之上我們?cè)賱?chuàng)建session, destination, producer和consumer,。因此主要的幾個(gè)步驟如下:
1. 獲得JMS connection factory. 通過(guò)我們提供特定環(huán)境的連接信息來(lái)構(gòu)造factory,。
2. 利用factory構(gòu)造JMS connection
3. 啟動(dòng)connection
4. 通過(guò)connection創(chuàng)建JMS session.
5. 指定JMS destination.
6. 創(chuàng)建JMS producer或者創(chuàng)建JMS message并提供destination.
7. 創(chuàng)建JMS consumer或注冊(cè)JMS message listener.
8. 發(fā)送和接收J(rèn)MS message.
9. 關(guān)閉所有JMS資源,包括connection, session, producer, consumer等,。
publish-subscribe
發(fā)布訂閱模式有點(diǎn)類(lèi)似于我們?nèi)粘I钪杏嗛唸?bào)紙。每年到年尾的時(shí)候,,郵局就會(huì)發(fā)一本報(bào)紙集合讓我們來(lái)選擇訂閱哪一個(gè)。在這個(gè)表里頭列了所有出版發(fā)行的報(bào)紙,,那么對(duì)于我們每一個(gè)訂閱者來(lái)說(shuō),,我們可以選擇一份或者多份報(bào)紙,。比如北京日?qǐng)?bào),、瀟湘晨報(bào)等。那么這些個(gè)我們訂閱的報(bào)紙,,就相當(dāng)于發(fā)布訂閱模式里的topic,。有很多個(gè)人訂閱報(bào)紙,也有人可能和我訂閱了相同的報(bào)紙,。那么,,在這里,,相當(dāng)于我們?cè)谕粋€(gè)topic里注冊(cè)了,。對(duì)于一份報(bào)紙發(fā)行方來(lái)說(shuō),,它和所有的訂閱者就構(gòu)成了一個(gè)1對(duì)多的關(guān)系。這種關(guān)系如下圖所示:
現(xiàn)在,假定我們用前面討論的場(chǎng)景來(lái)寫(xiě)一個(gè)簡(jiǎn)單的示例,。我們首先需要定義的是publisher.
publisher
publisher是屬于發(fā)布信息的一方,,它通過(guò)定義一個(gè)或者多個(gè)topic,然后給這些topic發(fā)送消息,。
publisher的構(gòu)造函數(shù)如下:
public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); }
- public Publisher() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- try {
- connection.start();
- } catch (JMSException jmse) {
- connection.close();
- throw jmse;
- }
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
- }
我們按照前面說(shuō)的流程定義了基本的connectionFactory, connection, session, producer,。這里代碼就是主要實(shí)現(xiàn)初始化的效果,。
接著,,我們需要定義一系列的topic讓所有的consumer來(lái)訂閱,設(shè)置topic的代碼如下:
- protected void setTopics(String[] stocks) throws JMSException {
- destinations = new Destination[stocks.length];
- for(int i = 0; i < stocks.length; i++) {
- destinations[i] = session.createTopic("STOCKS." + stocks[i]);
- }
- }
這里destinations是一個(gè)內(nèi)部定義的成員變量Destination[],。這里我們總共定義了的topic數(shù)取決于給定的參數(shù)stocks,。
在定義好topic之后我們要給這些指定的topic發(fā)消息,具體實(shí)現(xiàn)的代碼如下:
- protected void sendMessage(String[] stocks) throws JMSException {
- for(int i = 0; i < stocks.length; i++) {
- Message message = createStockMessage(stocks[i], session);
- System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
- producer.send(destinations[i], message);
- }
- }
-
- protected Message createStockMessage(String stock, Session session) throws JMSException {
- MapMessage message = session.createMapMessage();
- message.setString("stock", stock);
- message.setDouble("price", 1.00);
- message.setDouble("offer", 0.01);
- message.setBoolean("up", true);
-
- return message;
- }
- protected void sendMessage(String[] stocks) throws JMSException {
- for(int i = 0; i < stocks.length; i++) {
- Message message = createStockMessage(stocks[i], session);
- System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
- producer.send(destinations[i], message);
- }
- }
-
- protected Message createStockMessage(String stock, Session session) throws JMSException {
- MapMessage message = session.createMapMessage();
- message.setString("stock", stock);
- message.setDouble("price", 1.00);
- message.setDouble("offer", 0.01);
- message.setBoolean("up", true);
-
- return message;
- }
前面的代碼很簡(jiǎn)單,,在sendMessage方法里我們遍歷每個(gè)topic,,然后給每個(gè)topic發(fā)送定義的Message消息。
在定義好前面發(fā)送消息的基礎(chǔ)之后,,我們調(diào)用他們的代碼就很簡(jiǎn)單了:
- public static void main(String[] args) throws JMSException {
- if(args.length < 1)
- throw new IllegalArgumentException();
-
- // Create publisher
- Publisher publisher = new Publisher();
-
- // Set topics
- publisher.setTopics(args);
-
- for(int i = 0; i < 10; i++) {
- publisher.sendMessage(args);
- System.out.println("Publisher '" + i + " price messages");
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- // Close all resources
- publisher.close();
- }
調(diào)用他們的代碼就是我們遍歷所有topic,,然后通過(guò)sendMessage發(fā)送消息。在發(fā)送一個(gè)消息之后先sleep1秒鐘,。要注意的一個(gè)地方就是我們使用完資源之后必須要使用close方法將這些資源關(guān)閉釋放,。close方法關(guān)閉資源的具體實(shí)現(xiàn)如下: - public void close() throws JMSException {
- if (connection != null) {
- connection.close();
- }
- }
consumer
Consumer的代碼也很類(lèi)似,,具體的步驟無(wú)非就是1.初始化資源。 2. 接收消息,。 3. 必要的時(shí)候關(guān)閉資源,。
初始化資源可以放到構(gòu)造函數(shù)里面:
- public Consumer() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
接收和處理消息的方法有兩種,分為同步和異步的,,一般同步的方式我們是通過(guò)MessageConsumer.receive()方法來(lái)處理接收到的消息,。而異步的方法則是通過(guò)注冊(cè)一個(gè)MessageListener的方法,,使用MessageConsumer.setMessageListener(),。這里我們采用異步的方式實(shí)現(xiàn):
- public static void main(String[] args) throws JMSException {
- Consumer consumer = new Consumer();
- for (String stock : args) {
- Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
- MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
- messageConsumer.setMessageListener(new Listener());
- }
- }
-
- public Session getSession() {
- return session;
- }
在前面的代碼里我們先找到同樣的topic,然后遍歷所有的topic去獲得消息。對(duì)于消息的處理我們專(zhuān)門(mén)通過(guò)Listener對(duì)象來(lái)負(fù)責(zé),。
Listener對(duì)象的職責(zé)很簡(jiǎn)單,,主要就是處理接收到的消息:
- public class Listener implements MessageListener {
-
- public void onMessage(Message message) {
- try {
- MapMessage map = (MapMessage)message;
- String stock = map.getString("stock");
- double price = map.getDouble("price");
- double offer = map.getDouble("offer");
- boolean up = map.getBoolean("up");
- DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
- System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
它實(shí)現(xiàn)了MessageListener接口,里面的onMessage方法就是在接收到消息之后會(huì)被調(diào)用的方法,。
現(xiàn)在,,通過(guò)實(shí)現(xiàn)前面的publisher和consumer我們已經(jīng)實(shí)現(xiàn)了pub-sub模式的一個(gè)實(shí)例。仔細(xì)回想它的步驟的話,,主要就是要兩者設(shè)定一個(gè)共同的topic,,有了這個(gè)topic之后他們可以實(shí)現(xiàn)一方發(fā)消息另外一方接收。另外,,為了連接到具體的message server,這里是使用了連接tcp://localhost:16161作為定義ActiveMQConnectionFactory的路徑,。在publisher端通過(guò)session創(chuàng)建producer,根據(jù)指定的參數(shù)創(chuàng)建destination,,然后將消息和destination作為producer.send()方法的參數(shù)發(fā)消息,。在consumer端也要?jiǎng)?chuàng)建類(lèi)似的connection,
session。通過(guò)session得到destination,,再通過(guò)session.createConsumer(destination)來(lái)得到一個(gè)MessageConsumer對(duì)象,。有了這個(gè)MessageConsumer我們就可以自行選擇是直接同步的receive消息還是注冊(cè)listener了。
p2p
p2p的過(guò)程則理解起來(lái)更加簡(jiǎn)單,。它好比是兩個(gè)人打電話,,這兩個(gè)人是獨(dú)享這一條通信鏈路的。一方發(fā)送消息,,另外一方接收,,就這么簡(jiǎn)單。在實(shí)際應(yīng)用中因?yàn)橛卸鄠€(gè)用戶對(duì)使用p2p的鏈路,,它的通信場(chǎng)景如下圖所示:
我們?cè)賮?lái)看看一個(gè)p2p的示例:
在p2p的場(chǎng)景里,,相互通信的雙方是通過(guò)一個(gè)類(lèi)似于隊(duì)列的方式來(lái)進(jìn)行交流。和前面pub-sub的區(qū)別在于一個(gè)topic有一個(gè)發(fā)送者和多個(gè)接收者,,而在p2p里一個(gè)queue只有一個(gè)發(fā)送者和一個(gè)接收者,。
發(fā)送者
和前面的示例非常相似,我們構(gòu)造函數(shù)里需要初始化的內(nèi)容基本上差不多:
- public Publisher() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(null);
- }
發(fā)送消息的方法如下:
- public void sendMessage() throws JMSException {
- for(int i = 0; i < jobs.length; i++)
- {
- String job = jobs[i];
- Destination destination = session.createQueue("JOBS." + job);
- Message message = session.createObjectMessage(i);
- System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
- producer.send(destination, message);
- }
- }
這里我們定義了一個(gè)jobs的數(shù)組,,通過(guò)遍歷這個(gè)數(shù)組來(lái)創(chuàng)建不同的job queue,。這樣就相當(dāng)于建立了多個(gè)點(diǎn)對(duì)點(diǎn)通信的鏈路。
消息發(fā)送者的啟動(dòng)代碼如下:
- public static void main(String[] args) throws JMSException {
- Publisher publisher = new Publisher();
- for(int i = 0; i < 10; i++) {
- publisher.sendMessage();
- System.out.println("Published " + i + " job messages");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException x) {
- e.printStackTrace();
- }
- }
- publisher.close();
- }
我們?cè)谶@里發(fā)送10條消息,,當(dāng)然,,在每個(gè)sendMessage的方法里實(shí)際上是針對(duì)每個(gè)queue發(fā)送了10條。
接收者
接收者的代碼很簡(jiǎn)單,,一個(gè)構(gòu)造函數(shù)初始化所有的資源:
- public Consumer() throws JMSException {
- factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
還有一個(gè)就是注冊(cè)消息處理的對(duì)象:
- public static void main(String[] args) throws JMSException {
- Consumer consumer = new Consumer();
- for (String job : consumer.jobs) {
- Destination destination = consumer.getSession().createQueue("JOBS." + job);
- MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
- messageConsumer.setMessageListener(new Listener(job));
- }
- }
-
- public Session getSession() {
- return session;
- }
具體注冊(cè)的對(duì)象處理方法和前面還是類(lèi)似,,實(shí)現(xiàn)MessageListener接口就可以了,。
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.ObjectMessage;
-
- public class Listener implements MessageListener {
-
- private String job;
-
- public Listener(String job) {
- this.job = job;
- }
-
- public void onMessage(Message message) {
- try {
- //do something here
- System.out.println(job + " id:" + ((ObjectMessage)message).getObject());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
這里代碼和前面pub-sub的具體實(shí)現(xiàn)代碼非常相似,就不再贅述,。
現(xiàn)在如果我們比較一下pub-sub和p2p模式的具體實(shí)現(xiàn)步驟的話,,我們會(huì)發(fā)現(xiàn)他們基本的處理流程都是類(lèi)似的,除了在pub-sub中要通過(guò)createTopic來(lái)設(shè)置topic,,而在p2p中要通過(guò)createQueue來(lái)創(chuàng)建通信隊(duì)列,。他們之間存在著很多的重復(fù)之處,在具體的開(kāi)發(fā)過(guò)程中,,我們是否可以進(jìn)行一些工程上的優(yōu)化呢,?別急,后面我們會(huì)討論到的,。
request-response
和前面兩種方式比較起來(lái),request-response的通信方式很常見(jiàn),,但是不是默認(rèn)提供的一種模式,。在前面的兩種模式中都是一方負(fù)責(zé)發(fā)送消息而另外一方負(fù)責(zé)處理。而我們實(shí)際中的很多應(yīng)用相當(dāng)于一種一應(yīng)一答的過(guò)程,,需要雙方都能給對(duì)方發(fā)送消息,。于是請(qǐng)求-應(yīng)答的這種通信方式也很重要。它也應(yīng)用的很普遍,。
請(qǐng)求-應(yīng)答方式并不是JMS規(guī)范系統(tǒng)默認(rèn)提供的一種通信方式,,而是通過(guò)在現(xiàn)有通信方式的基礎(chǔ)上稍微運(yùn)用一點(diǎn)技巧實(shí)現(xiàn)的。下圖是典型的請(qǐng)求-應(yīng)答方式的交互過(guò)程:
在JMS里面,,如果要實(shí)現(xiàn)請(qǐng)求/應(yīng)答的方式,,可以利用JMSReplyTo和JMSCorrelationID消息頭來(lái)將通信的雙方關(guān)聯(lián)起來(lái)。另外,,QueueRequestor和TopicRequestor能夠支持簡(jiǎn)單的請(qǐng)求/應(yīng)答過(guò)程,。
現(xiàn)在,如果我們要實(shí)現(xiàn)這么一個(gè)過(guò)程,,在發(fā)送請(qǐng)求消息并且等待返回結(jié)果的client端的流程如下:
- // client side
- Destination tempDest = session.createTemporaryQueue();
- MessageConsumer responseConsumer = session.createConsumer(tempDest);
- ...
-
- // send a request..
- message.setJMSReplyTo(tempDest)
- message.setJMSCorrelationID(myCorrelationID);
-
- producer.send(message);
client端創(chuàng)建一個(gè)臨時(shí)隊(duì)列并在發(fā)送的消息里指定了發(fā)送返回消息的destination以及correlationID,。那么在處理消息的server端得到這個(gè)消息后就知道該發(fā)送給誰(shuí)了。Server端的大致流程如下:
- public void onMessage(Message request) {
-
- Message response = session.createMessage();
- response.setJMSCorrelationID(request.getJMSCorrelationID())
-
- producer.send(request.getJMSReplyTo(), response)
- }
這里我們是用server端注冊(cè)MessageListener,,通過(guò)設(shè)置返回信息的CorrelationID和JMSReplyTo將信息返回,。
以上就是發(fā)送和接收消息的雙方的大致程序結(jié)構(gòu)。具體的實(shí)現(xiàn)代碼如下:
Client:
- public Client() {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection;
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(transacted, ackMode);
- Destination adminQueue = session.createQueue(clientQueueName);
-
- //Setup a message producer to send message to the queue the server is consuming from
- this.producer = session.createProducer(adminQueue);
- this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- //Create a temporary queue that this client will listen for responses on then create a consumer
- //that consumes message from this temporary queue...for a real application a client should reuse
- //the same temp queue for each message to the server...one temp queue per client
- Destination tempDest = session.createTemporaryQueue();
- MessageConsumer responseConsumer = session.createConsumer(tempDest);
-
- //This class will handle the messages to the temp queue as well
- responseConsumer.setMessageListener(this);
-
- //Now create the actual message you want to send
- TextMessage txtMessage = session.createTextMessage();
- txtMessage.setText("MyProtocolMessage");
-
- //Set the reply to field to the temp queue you created above, this is the queue the server
- //will respond to
- txtMessage.setJMSReplyTo(tempDest);
-
- //Set a correlation ID so when you get a response you know which sent message the response is for
- //If there is never more than one outstanding message to the server then the
- //same correlation ID can be used for all the messages...if there is more than one outstanding
- //message to the server you would presumably want to associate the correlation ID with this
- //message somehow...a Map works good
- String correlationId = this.createRandomString();
- txtMessage.setJMSCorrelationID(correlationId);
- this.producer.send(txtMessage);
- } catch (JMSException e) {
- //Handle the exception appropriately
- }
- }
這里的代碼除了初始化構(gòu)造函數(shù)里的參數(shù)還同時(shí)設(shè)置了兩個(gè)destination,,一個(gè)是自己要發(fā)送消息出去的destination,,在session.createProducer(adminQueue);這一句設(shè)置。另外一個(gè)是自己要接收的消息destination, 通過(guò)Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 這兩句指定了要接收消息的目的地,。這里是用的一個(gè)臨時(shí)隊(duì)列,。在前面指定了返回消息的通信隊(duì)列之后,,我們需要通知server端知道發(fā)送返回消息給哪個(gè)隊(duì)列。于是txtMessage.setJMSReplyTo(tempDest);指定了這一部分,,同時(shí)txtMessage.setJMSCorrelationID(correlationId);方法主要是為了保證每次發(fā)送回來(lái)請(qǐng)求的server端能夠知道對(duì)應(yīng)的是哪個(gè)請(qǐng)求,。這里一個(gè)請(qǐng)求和一個(gè)應(yīng)答是相當(dāng)于對(duì)應(yīng)一個(gè)相同的序列號(hào)一樣。
同時(shí),,因?yàn)閏lient端在發(fā)送消息之后還要接收server端返回的消息,,所以它也要實(shí)現(xiàn)一個(gè)消息receiver的功能。這里采用實(shí)現(xiàn)MessageListener接口的方式:
- public void onMessage(Message message) {
- String messageText = null;
- try {
- if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- messageText = textMessage.getText();
- System.out.println("messageText = " + messageText);
- }
- } catch (JMSException e) {
- //Handle the exception appropriately
- }
- }
Server:
這里server端要執(zhí)行的過(guò)程和client端相反,,它是先接收消息,,在接收到消息后根據(jù)提供的JMSCorelationID來(lái)發(fā)送返回的消息:
- public void onMessage(Message message) {
- try {
- TextMessage response = this.session.createTextMessage();
- if (message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message;
- String messageText = txtMsg.getText();
- response.setText(this.messageProtocol.handleProtocolMessage(messageText));
- }
-
- //Set the correlation ID from the received message to be the correlation id of the response message
- //this lets the client identify which message this is a response to if it has more than
- //one outstanding message to the server
- response.setJMSCorrelationID(message.getJMSCorrelationID());
-
- //Send the response to the Destination specified by the JMSReplyTo field of the received message,
- //this is presumably a temporary queue created by the client
- this.replyProducer.send(message.getJMSReplyTo(), response);
- } catch (JMSException e) {
- //Handle the exception appropriately
- }
- }
前面,在replyProducer.send()方法里,,message.getJMSReplyTo()就得到了要發(fā)送消息回去的destination,。
另外,設(shè)置這些發(fā)送返回信息的replyProducer的信息主要在構(gòu)造函數(shù)相關(guān)的方法里實(shí)現(xiàn)了:
- public Server() {
- try {
- //This message broker is embedded
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector(messageBrokerUrl);
- broker.start();
- } catch (Exception e) {
- //Handle the exception appropriately
- }
-
- //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
- //is ready to handle messages
- this.messageProtocol = new MessageProtocol();
- this.setupMessageQueueConsumer();
- }
-
- private void setupMessageQueueConsumer() {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
- Connection connection;
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- this.session = connection.createSession(this.transacted, ackMode);
- Destination adminQueue = this.session.createQueue(messageQueueName);
-
- //Setup a message producer to respond to messages from clients, we will get the destination
- //to send to from the JMSReplyTo header field from a Message
- this.replyProducer = this.session.createProducer(null);
- this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- //Set up a consumer to consume messages off of the admin queue
- MessageConsumer consumer = this.session.createConsumer(adminQueue);
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- //Handle the exception appropriately
- }
- }
總體來(lái)說(shuō),,整個(gè)的交互過(guò)程并不復(fù)雜,,只是比較繁瑣。對(duì)于請(qǐng)求/應(yīng)答的方式來(lái)說(shuō),,這種典型交互的過(guò)程就是Client端在設(shè)定正常發(fā)送請(qǐng)求的Queue同時(shí)也設(shè)定一個(gè)臨時(shí)的Queue,。同時(shí)在要發(fā)送的message里頭指定要返回消息的destination以及CorelationID,這些就好比是一封信里面所帶的回執(zhí),。根據(jù)這個(gè)信息人家才知道怎么給你回信,。對(duì)于Server端來(lái)說(shuō)則要額外創(chuàng)建一個(gè)producer,在處理接收到消息的方法里再利用producer將消息發(fā)回去,。這一系列的過(guò)程看起來(lái)很像http協(xié)議里面請(qǐng)求-應(yīng)答的方式,,都是一問(wèn)一答。
一些應(yīng)用和改進(jìn)
回顧前面三種基本的通信方式,,我們會(huì)發(fā)現(xiàn),,他們都存在著一定的共同點(diǎn),比如說(shuō)都要初始化ConnectionFactory, Connection, Session等,。在使用完之后都要將這些資源關(guān)閉,。如果每一個(gè)實(shí)現(xiàn)它的通信端都這么寫(xiě)一通的話,其實(shí)是一種簡(jiǎn)單的重復(fù),。從工程的角度來(lái)看是完全沒(méi)有必要的,。那么,我們有什么辦法可以減少這種重復(fù)呢,?
一種簡(jiǎn)單的方式就是通過(guò)工廠方法封裝這些對(duì)象的創(chuàng)建和銷(xiāo)毀,,然后簡(jiǎn)單的通過(guò)調(diào)用工廠方法的方式得到他們。另外,,既然基本的流程都是在開(kāi)頭創(chuàng)建資源在結(jié)尾銷(xiāo)毀,,我們也可以采用Template Method模式的思路,。通過(guò)繼承一個(gè)抽象類(lèi),在抽象類(lèi)里提供了資源的封裝,。所有繼承的類(lèi)只要實(shí)現(xiàn)怎么去使用這些資源的方法就可以了,。Spring中間的JMSTemplate就提供了這種類(lèi)似思想的封裝。具體的實(shí)現(xiàn)可以參考這篇文章,。
總結(jié)
activemq默認(rèn)提供了pub-sub, p2p這兩種通信的方式,。同時(shí)也提供了一些對(duì)request-response方式的支持。實(shí)際上,,不僅僅是activemq,,對(duì)于所有其他實(shí)現(xiàn)JMS規(guī)范的產(chǎn)品都能夠提供類(lèi)似的功能。這里每種方式都不太復(fù)雜,,主要是創(chuàng)建和管理資源的步驟顯得比較繁瑣,。
參考資料
activemq in action
http://activemq./how-should-i-implement-request-response-with-jms.html
http://zorro.blog.51cto.com/2139862/831986
|