自JMS 1.1于2002年發(fā)布以來,,JMS規(guī)范今年進(jìn)行了第一次更新——發(fā)布了JMS 2.0,。 在JMS 2.0里,主要進(jìn)行了易用性方面的提升,、簡化了開發(fā)(這終于追趕上EJB、JPA等Java EE里其他子規(guī)范的腳步了),。另一方面,,消息處理本身也增加了一些新特性,比如多個(gè)消費(fèi)者共享同一個(gè)主題訂閱,、延遲發(fā)送,、異步發(fā)送消息、JMS提供者必須設(shè)置JMSXDeliveryCount消息屬性等,。 接下來看看具體的變化和內(nèi)容,。 簡化的APIJMS 2.0里最大的變化是引入了一組新的API,用來發(fā)送,、接收消息,,減少了開發(fā)人員的編碼量。對運(yùn)行在Java EE應(yīng)用服務(wù)器里的應(yīng)用來說,,新的API也支持資源注入,,這樣的話,,JMS對象的創(chuàng)建和管理就可以由應(yīng)用服務(wù)器負(fù)責(zé),應(yīng)用也可以進(jìn)一步簡化,。 JMS 2.0是Java EE 7的一部分,,可以用在Web應(yīng)用或EJB應(yīng)用里,當(dāng)然,,它也可以直接在Java SE環(huán)境里使用,。 新的API叫做“簡化的API”,物如其名,,它比JMS 1.1的API更簡單,、更易用。簡化的API包含三個(gè)新的接口:
簡化的API包括1.1 API的所有特性,,而且新增了一些功能。但2.0向后兼容,,并沒有廢棄1.1的API,,開發(fā)人員仍然可以繼續(xù)使用1.1的API。 下面我們來比較一下新舊API的代碼示例,。 發(fā)送消息JMS 1.1
Java代碼
public void sendMessage11(ConnectionFactory connectionFactory, Queue queue, String msg) {
Connection connection = null;
try {
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage(msg);
messageProducer.send(textMessage);
} catch (JMSException ex) {
// handle exception
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
JMS 2.0
Java代碼
public void sendMessage20(ConnectionFactory connectionFactory, Queue queue, String msg) {
try (JMSContext context = connectionFactory.createContext();){
context.createProducer().send(queue, msg);
} catch (JMSRuntimeException ex) {
// handle exception
}
}
顯而易見,,使用2.0 API的代碼更為簡潔。具體來說:
1,、1.1要分別創(chuàng)建Connection和Session對象,,而2.0只創(chuàng)建一個(gè)JMSContext對象。 2,、1.1要在發(fā)送完消息之后釋放連接,,穩(wěn)妥的做法就是在finally塊里調(diào)用Connection的close方法(MessageProducer和Session不用分別close,連接關(guān)閉后會(huì)被刪除),。但2.0里的JMSContext實(shí)現(xiàn)了Java SE 7里的java.lang.AutoCloseable接口,,我們就可以使用Java SE 7里的try-with-resources塊(也是Java SE 7里的一個(gè)新特性),這個(gè)塊執(zhí)行完之后會(huì)自動(dòng)調(diào)用JMSContext的close方法釋放資源,,而不用我們在代碼里顯式關(guān)閉。 實(shí)際上,,2.0里所有有close方法的接口(包括Connection,、Session,、MessageProducer、MessageConsumer)都實(shí)現(xiàn)了java.lang.AutoCloseable接口,,它們都可以用在try-with-resources塊里,。不過這只能用在Java SE 7的環(huán)境里。 3,、1.1里創(chuàng)建Session對象時(shí)需要指定會(huì)話是否使用本地事務(wù),,以及消息確認(rèn)方式。2.0提供了默認(rèn)配置,,也提供了API去設(shè)置其他的會(huì)話模式,。 4、上面的例子是發(fā)送一個(gè)文本消息,。1.1里需要?jiǎng)?chuàng)建消息對象(TextMessage),,將消息體設(shè)置為指定的字符串。2.0里直接把字符串傳給send方法就可以了,,JMS提供者會(huì)自動(dòng)完成1.1里需要開發(fā)人員做的事情,。 5、1.1的API拋出的都是受檢查的異常JMSException,。2.0里新的API則會(huì)拋出運(yùn)行時(shí)異常JMSRuntimeException,,所以我們的代碼里可以不再顯式地進(jìn)行捕獲。 同步接收消息JMS 1.1
Java代碼
public String receiveMessage11(ConnectionFactory connectionFactory, Queue queue) {
String msg = '';
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage textMessage = (TextMessage) messageConsumer.receive();
msg = textMessage.getText();
} catch (JMSException ex) {
// handle exception
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
return msg;
}
JMS 2.0
Java代碼
public String receiveMessage20(ConnectionFactory connectionFactory, Queue queue){
String msg = null;
try (JMSContext context = connectionFactory.createContext();){
JMSConsumer consumer = session.createConsumer(queue);
msg = consumer.receiveBody(String.class);
} catch (JMSRuntimeException ex) {
// handle exception
}
return msg;
}
接收消息的代碼互相對比,,簡化的部分和發(fā)送消息是類似的,。需要特別說明的是:
1、1.1需要調(diào)用connection.start()把消息分發(fā)到消費(fèi)者,。2.0里連接會(huì)自動(dòng)啟動(dòng),。 2、1.1接收的是Message對象,,需要顯式進(jìn)行類型轉(zhuǎn)換,,再進(jìn)一步獲取消息體,而且在1.1里,,針對不同的消息類型,,獲取消息體的方法也是各自特定的。2.0里統(tǒng)一調(diào)用receiveBody方法就可以直接返回消息體,,或者獲取消息后統(tǒng)一用getBody方法獲取消息體,。 異步接收消息前面接收消息的例子是同步的,接收方法在接收到消息之前會(huì)一直阻塞,,除非超時(shí),。 如果想異步接收消息,需要給MessageConsumer或JMSConsumer對象設(shè)置MessageListener接口的實(shí)現(xiàn)類,,讓MessageListener去處理消息(onMessage方法),。 不同之處仍然是1.1需要顯式調(diào)用connection.start(),。 JMS 1.1
Java代碼
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(msgListener);
connection.start();
JMS 2.0
Java代碼
JMSConsumer consumer = context.createConsumer(queue);
consumer.setMessageListener(messageListener);
注意,上面的示例代碼是在Java客戶端代碼里的寫法,。如果要在Web應(yīng)用或EJB應(yīng)用里異步接收消息,,需要使用消息驅(qū)動(dòng)Bean。
將JMSContext注入Java EE應(yīng)用編寫Web應(yīng)用或EJB應(yīng)用時(shí),,使用JMS 2.0簡化的API會(huì)更加簡單,。因?yàn)镴ava EE 7的應(yīng)用服務(wù)器會(huì)負(fù)責(zé)JMSContext的創(chuàng)建和關(guān)閉,開發(fā)人員只需要把它注入應(yīng)用就可以了,。
Java代碼
// @Inject注解告訴容器創(chuàng)建JMSContext
@Inject
// @JMSConnectionFactory注解告訴容器應(yīng)用使用的連接工廠的JNDI名
@JMSConnectionFactory('jms/connectionFactory')
private JMSContext context;
@Resource(lookup='jms/queue')
private Queue queue;
public void sendMessage(String msg) {
context.send(queue, msg);
}
更簡單的資源配置對JMS應(yīng)用來說,,需要使用連接工廠和目的地(隊(duì)列或主題)。連接工廠需要知道JMS提供者所在的主機(jī)名和監(jiān)聽端口,,應(yīng)用通過連接工廠創(chuàng)建到JMS提供者的連接,;隊(duì)列或主題則要知道作為消息端點(diǎn)的物理隊(duì)列或物理主題。由于每個(gè)JMS提供者創(chuàng)建,、配置它們的方式不盡相同,,所以JMS推薦單獨(dú)創(chuàng)建、配置,,然后綁定到JNDI樹上,,應(yīng)用使用時(shí)再通過JNDI查找、直接獲取連接工廠或目的地對象,。Java EE規(guī)范推薦把代碼和配置分開,,是為了保證應(yīng)用的可移植性,代碼不需要知道具體的物理細(xì)節(jié)(像主機(jī)名,、端口,、物理名稱等)。 但對于大規(guī)模集群上部署的應(yīng)用來說,,代碼和配置分離反而可能會(huì)增加額外的工作量,,尤其是在集群不能統(tǒng)一配置的情形下。而且在這樣的生產(chǎn)環(huán)境下,,應(yīng)用重部署和遷移的情況相對較少,,那能否在應(yīng)用部署的過程中就一起把應(yīng)用要使用的資源對象給創(chuàng)建出來、省去管理員單獨(dú)配置的環(huán)節(jié)呢,? JMS 2.0簡化了JMS資源的配置,,就適用于前面說的情形。應(yīng)用開發(fā)人員可以在代碼里使用2.0新增的JMS資源定義注解,,也可以在部署描述符里定義,,也可以兩者結(jié)合。 JMS資源定義注解JMS資源定義注解主要有javax.jms.JMSConnectionFactoryDefinition和javax.jms.JMSDestinationDefinition,可以直接定義在EJB或Servlet里,。
Java代碼
@JMSConnectionFactoryDefinition(
name='java:global/jms/connectionFactory',
maxPoolSize = 30,
minPoolSize= 20,
properties = {
'addressList=mq://localhost:7676',
'reconnectEnabled=true'
}
)
@JMSDestinationDefinition(
name = 'java:global/jms/queue',
interfaceName = 'javax.jms.Queue',
destinationName = 'queue'
)
public class TestServlet extends HttpServlet {
...
}
如果需要定義多個(gè)連接工廠或目的地,,JMSConnectionFactoryDefinition和JMSDestinationDefinition可以分別包含在MSConnectionFactoryDefinitions和JMSDestinationDefinitions注解里。類似于EJBs和EJB,。
Java代碼
@JMSConnectionFactoryDefinitions({
@JMSConnectionFactoryDefinition(
name='java:global/jms/connectionFactory1',
maxPoolSize = 30,
minPoolSize= 20,
properties = {
'addressList=mq://localhost:7676',
'reconnectEnabled=true'
}
),
@JMSConnectionFactoryDefinition(
name='java:global/jms/connectionFactory2',
maxPoolSize = 30,
minPoolSize= 20,
properties = {
'addressList=mq://localhost:7677',
'reconnectEnabled=true'
}
)
})
@JMSDestinationDefinitions({
@JMSDestinationDefinition(
name='java:global/jms/queue1',
interfaceName = 'javax.jms.Queue',
destinationName = 'queue1'
),
@JMSDestinationDefinition(
name='java:global/jms/queue2',
interfaceName = 'javax.jms.Queue',
destinationName = 'queue2'
)
})
public class TestServlet extends HttpServlet {
...
}
需要注意的是,使用JMS資源定義注解定義的連接工廠和目的地必須放在java:comp,、java:module,、java:app或者java:global名字空間里,生命周期和應(yīng)用保持一致,,即部署應(yīng)用時(shí)生成,,解部署應(yīng)用時(shí)就會(huì)刪除。 部署描述符如果不想讓資源的配置侵入代碼,,或者開發(fā)的時(shí)候還不確定JMS提供者的真實(shí)信息(比如主機(jī)地址和端口),,那可以選擇在應(yīng)用的部署描述里定義連接工廠和目的地,比如web.xml或ejb-jar.xml,。 Xml代碼
<jms-connection-factory>
<name>java:global/jms/connectionFactory</name>
<max-pool-size>30</max-pool-size>
<min-pool-size>20</min-pool-size>
<property>
<name>addressList</name>
<value>mq://localhost:7676</value>
</property>
<property>
<name>reconnectEnabled</name>
<value>true</value>
</property>
</jms-connection-factory>
<jms-destination>
<name>java:global/jms/queue</name>
<interfaceName>javax.jms.Queue</interfaceName>
<destinationName>queue</destinationName>
</jms-destination>
延遲發(fā)送2.0里,,消息生產(chǎn)者可以指定一個(gè)延遲發(fā)送時(shí)間,JMS提供者等過了這么長時(shí)間之后再發(fā)送消息,。 這個(gè)功能具體體現(xiàn)在MessageProducer和JMSProducer的API里,。 MessageProducerJava代碼
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryDelay(20000); // 20000毫秒,,發(fā)送消息之前設(shè)置
TextMessage textMessage = session.createTextMessage('Hello World!');
messageProducer.send(textMessage);
JMSProducerJava代碼
try (JMSContext context = connectionFactory.createContext();){
context.createProducer().setDeliveryDelay(20000).send(queue, 'Hello World!'); // 20000毫秒,也是在發(fā)送消息之前設(shè)置
}
異步發(fā)送消息2.0里新的send方法允許應(yīng)用異步發(fā)送消息,。 傳統(tǒng)的消息發(fā)送方式是阻塞的,,消息安全發(fā)送到JMS提供者之后,發(fā)送方法才會(huì)返回,。如果消息是持久化的,,那中間的阻塞時(shí)間還要算上持久化花費(fèi)的時(shí)間(寫文件或?qū)憯?shù)據(jù)庫,或者其他的存儲(chǔ)介質(zhì)),。 異步發(fā)送在發(fā)送完消息之后并不會(huì)等待JMS提供者的應(yīng)答,,而是繼續(xù)做其它事情。JMS提供者收到消息,、做完持久化之后,,會(huì)回調(diào)應(yīng)用指定的CompletionListener的onCompletion方法,通知客戶端應(yīng)用消息發(fā)送的結(jié)果,。這樣的話,,客戶端應(yīng)用的執(zhí)行效率就可以大大提升。 新的JMSProducer和以前就有的MessageProducer都支持異步發(fā)送,。我們來看看具體的例子,。 CompletionListenerJava代碼
class SampleCompletionListener implements CompletionListener {
CountDownLatch latch;
Exception exception;
public SampleCompletionListener(CountDownLatch latch) {
this.latch=latch;
}
@Override
public void onCompletion(Message message) {
latch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
latch.countDown();
this.exception = exception;
}
public Exception getException(){
return exception;
}
}
MessageProducerJava代碼
private void asyncSend11(ConnectionFactory connectionFactory, Queue queue) throws Exception {
try (Connection connection = connectionFactory.createConnection();){
Session session = connection.createSession();
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage('Hello World!');
CountDownLatch latch = new CountDownLatch(1);
SampleCompletionListener completionListener = new SampleCompletionListener(latch);
messageProducer.send(textMessage, completionListener);
System.out.println('Waiting for a reply...');
// 做其他事情,不讓應(yīng)用閑等
latch.await();
Exception exception = completionListener.getException();
if (exception == null){
System.out.println('Send message successfully.');
} else {
System.out.println('Failed to send message. ' exception.getMessage());
}
}
}
JMSProducerJava代碼
private void asyncSend20(ConnectionFactory connectionFactory, Queue queue) throws Exception {
try (JMSContext context = connectionFactory.createContext();){
CountDownLatch latch = new CountDownLatch(1);
SampleCompletionListener completionListener = new SampleCompletionListener(latch);
context.createProducer().setAsync(completionListener).send(queue, 'Hello World!');
System.out.println('Waiting for a reply...');
// 做其他事情,,不讓應(yīng)用閑等
latch.await();
Exception exception = completionListener.getException();
if (exception == null){
System.out.println('Send message successfully.');
} else {
System.out.println('Failed to send message. ' exception.getMessage());
}
}
}
不過這個(gè)特性不適用于Java EE的Web容器和EJB容器,只可用于Java SE應(yīng)用,、或者是Java EE Application Client容器,。 多個(gè)消費(fèi)者共享同一個(gè)主題訂閱在JMS 1.1里,主題訂閱和消費(fèi)者是一一對應(yīng)的,,也就是在同一時(shí)間,,一個(gè)主題訂閱只能有一個(gè)消費(fèi)者。 Java代碼
private void createConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(topic);
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println('Message received: ' ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
如果我們想調(diào)用createConsumer創(chuàng)建多個(gè)消費(fèi)者,,那每個(gè)消費(fèi)者各有一個(gè)訂閱,,每個(gè)消費(fèi)者都會(huì)從主題接收到一份消息拷貝。舉例來說,,有兩個(gè)消費(fèi)者A和B,,主題topic上有三條消息x、y,、z,,那A會(huì)拿到x、y,、z的一份拷貝,,B也會(huì)拿到x、y,、z的一份拷貝,。要是想A和B能合起來消費(fèi)x、y,、z,,即A消費(fèi)x、z,,B消費(fèi)y,,那只能是A和B共享同一個(gè)訂閱了,可惜1.1并不支持共享的訂閱,。 2.0里可以指定訂閱是“可共享的”,,不論訂閱是持久的還是非持久的,而且Session和JMSContext都支持這個(gè)特性,。這樣的話,,消息的處理就可以由多線程、或者多個(gè)不同的連接、甚至多個(gè)Java進(jìn)程同時(shí)進(jìn)行,,顯而易見,,這很利于提升應(yīng)用的可伸縮性。 Java代碼
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createSharedConsumer(topic, 'ourSubscription'); // 需要指定共享訂閱的名稱,,以便多個(gè)消費(fèi)者能確定彼此共享的訂閱
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println('Message received: ' ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
前面的例子創(chuàng)建的是非持久的訂閱,同樣的,,持久訂閱在2.0里也是可以共享的,,而且Session和JMSContext也都支持。 Java代碼
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, 'ourDurableSubscription');
當(dāng)然,,2.0依舊支持不共享的訂閱。 JMS提供者必須設(shè)置消息屬性JMSXDeliveryCountJMS消息由三部分組成:
標(biāo)準(zhǔn)屬性里,,有一個(gè)JMSXDeliveryCount(int類型),它由JMS提供者設(shè)置,,表示JMS提供者給消費(fèi)者發(fā)送消息的嘗試次數(shù),。第一次發(fā)送消息的時(shí)候,該屬性的值設(shè)置為1,,然后每嘗試發(fā)送一次,,值就加一。如果該屬性的值為N,,那就說明前N-1次都發(fā)送失敗了,。消費(fèi)者如果不做特殊的處理,那會(huì)一直重復(fù)發(fā)送,。 消費(fèi)者可以根據(jù)這個(gè)值確認(rèn)消息是否被重復(fù)發(fā)送了,,進(jìn)而可以進(jìn)行特殊的處理,比如把這條消息放入死信隊(duì)列等,。 JMSXDeliveryCount在1.1里就有,,只是是可選的,2.0里則變成了必需的,。下面是一個(gè)借助JMSXDeliveryCount屬性的值進(jìn)行不同處理的例子: Java代碼
class SampleMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
int deliveryCount = message.getIntProperty('JMSXDeliveryCount');
if (deliveryCount < 10){
// 故意拋出運(yùn)行時(shí)一場,,模擬消息處理失敗的情形,使得JMS提供者能重發(fā)消息
throw new RuntimeException('Exception thrown to simulate a bad message');
} else {
// 消息已經(jīng)被發(fā)送了10次,,放棄重發(fā),,進(jìn)行其他的處理
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
JMS 2.0主要的新特性就介紹完了,其實(shí)在細(xì)節(jié)上還有不少變化,,大家可以參考JMS 2.0規(guī)范和API文檔理解更多內(nèi)容,。 |
|