久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

JMS 2.0的新變化

 yespon 2016-09-12

自JMS 1.1于2002年發(fā)布以來,,JMS規(guī)范今年進(jìn)行了第一次更新——發(fā)布了JMS 2.0,。

在JMS 2.0里,主要進(jìn)行了易用性方面的提升,、簡化了開發(fā)(這終于追趕上EJB、JPA等Java EE里其他子規(guī)范的腳步了),。另一方面,,消息處理本身也增加了一些新特性,比如多個(gè)消費(fèi)者共享同一個(gè)主題訂閱,、延遲發(fā)送,、異步發(fā)送消息、JMS提供者必須設(shè)置JMSXDeliveryCount消息屬性等,。

接下來看看具體的變化和內(nèi)容,。

簡化的API

JMS 2.0里最大的變化是引入了一組新的API,用來發(fā)送,、接收消息,,減少了開發(fā)人員的編碼量。對運(yùn)行在Java EE應(yīng)用服務(wù)器里的應(yīng)用來說,,新的API也支持資源注入,,這樣的話,,JMS對象的創(chuàng)建和管理就可以由應(yīng)用服務(wù)器負(fù)責(zé),應(yīng)用也可以進(jìn)一步簡化,。

JMS 2.0是Java EE 7的一部分,,可以用在Web應(yīng)用或EJB應(yīng)用里,當(dāng)然,,它也可以直接在Java SE環(huán)境里使用,。

新的API叫做“簡化的API”,物如其名,,它比JMS 1.1的API更簡單,、更易用。簡化的API包含三個(gè)新的接口:

  • JMSContext:綜合了1.1里的Connection和Session,;
  • JMSProducer:1.1里MessageProducer的替換者,。支持消息發(fā)送選項(xiàng)、頭信息,,還可以用方法鏈配置屬性,;
  • JMSConsumer:替換1.1里的MessageConsumer,用法和MessageConsumer類似,。

簡化的API包括1.1 API的所有特性,,而且新增了一些功能。但2.0向后兼容,,并沒有廢棄1.1的API,,開發(fā)人員仍然可以繼續(xù)使用1.1的API。

下面我們來比較一下新舊API的代碼示例,。

發(fā)送消息

JMS 1.1

 

Java代碼  收藏代碼
  1. public void sendMessage11(ConnectionFactory connectionFactory, Queue queue, String msg) {  
  2.    Connection connection = null;  
  3.    try {  
  4.       connection = connectionFactory.createConnection();  
  5.       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  6.       MessageProducer messageProducer = session.createProducer(queue);  
  7.       TextMessage textMessage = session.createTextMessage(msg);  
  8.       messageProducer.send(textMessage);  
  9.    } catch (JMSException ex) {  
  10.       // handle exception  
  11.    } finally {  
  12.       if (connection != null) {  
  13.          try {  
  14.             connection.close();  
  15.          } catch (JMSException ex) {  
  16.          }  
  17.       }        
  18.    }  
  19. }  
public void sendMessage11(ConnectionFactory connectionFactory, Queue queue, String msg) { Connection connection = null; try { connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage(msg); messageProducer.send(textMessage); } catch (JMSException ex) { // handle exception } finally { if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } }

 

 JMS 2.0

 

Java代碼  收藏代碼
  1. public void sendMessage20(ConnectionFactory connectionFactory, Queue queue, String msg) {  
  2.    try (JMSContext context = connectionFactory.createContext();){  
  3.       context.createProducer().send(queue, msg);  
  4.    } catch (JMSRuntimeException ex) {  
  5.       // handle exception  
  6.    }  
  7. }  
public void sendMessage20(ConnectionFactory connectionFactory, Queue queue, String msg) { try (JMSContext context = connectionFactory.createContext();){ context.createProducer().send(queue, msg); } catch (JMSRuntimeException ex) { // handle exception } }

 顯而易見,,使用2.0 API的代碼更為簡潔。具體來說:

 

    1,、1.1要分別創(chuàng)建Connection和Session對象,,而2.0只創(chuàng)建一個(gè)JMSContext對象。

    2,、1.1要在發(fā)送完消息之后釋放連接,,穩(wěn)妥的做法就是在finally塊里調(diào)用Connection的close方法(MessageProducer和Session不用分別close,連接關(guān)閉后會(huì)被刪除),。但2.0里的JMSContext實(shí)現(xiàn)了Java SE 7里的java.lang.AutoCloseable接口,,我們就可以使用Java SE 7里的try-with-resources塊(也是Java SE 7里的一個(gè)新特性),這個(gè)塊執(zhí)行完之后會(huì)自動(dòng)調(diào)用JMSContext的close方法釋放資源,,而不用我們在代碼里顯式關(guān)閉。

    實(shí)際上,,2.0里所有有close方法的接口(包括Connection,、Session,、MessageProducer、MessageConsumer)都實(shí)現(xiàn)了java.lang.AutoCloseable接口,,它們都可以用在try-with-resources塊里,。不過這只能用在Java SE 7的環(huán)境里。

    3,、1.1里創(chuàng)建Session對象時(shí)需要指定會(huì)話是否使用本地事務(wù),,以及消息確認(rèn)方式。2.0提供了默認(rèn)配置,,也提供了API去設(shè)置其他的會(huì)話模式,。

    4、上面的例子是發(fā)送一個(gè)文本消息,。1.1里需要?jiǎng)?chuàng)建消息對象(TextMessage),,將消息體設(shè)置為指定的字符串。2.0里直接把字符串傳給send方法就可以了,,JMS提供者會(huì)自動(dòng)完成1.1里需要開發(fā)人員做的事情,。

    5、1.1的API拋出的都是受檢查的異常JMSException,。2.0里新的API則會(huì)拋出運(yùn)行時(shí)異常JMSRuntimeException,,所以我們的代碼里可以不再顯式地進(jìn)行捕獲。

同步接收消息

JMS 1.1

 

Java代碼  收藏代碼
  1. public String receiveMessage11(ConnectionFactory connectionFactory, Queue queue) {  
  2.    String msg = '';  
  3.    Connection connection = null;  
  4.    try {  
  5.       connection = connectionFactory.createConnection();  
  6.       connection.start();  
  7.       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  8.       MessageConsumer messageConsumer = session.createConsumer(queue);        
  9.       TextMessage textMessage = (TextMessage) messageConsumer.receive();  
  10.       msg = textMessage.getText();  
  11.    } catch (JMSException ex) {  
  12.       // handle exception  
  13.    } finally {  
  14.       if (connection != null) {  
  15.          try {  
  16.             connection.close();  
  17.          } catch (JMSException ex) {  
  18.          }  
  19.       }        
  20.    }  
  21.    return msg;  
  22. }  
public String receiveMessage11(ConnectionFactory connectionFactory, Queue queue) { String msg = ''; Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(queue); TextMessage textMessage = (TextMessage) messageConsumer.receive(); msg = textMessage.getText(); } catch (JMSException ex) { // handle exception } finally { if (connection != null) { try { connection.close(); } catch (JMSException ex) { } } } return msg; }

JMS 2.0

 

Java代碼  收藏代碼
  1. public String receiveMessage20(ConnectionFactory connectionFactory, Queue queue){  
  2.    String msg = null;  
  3.    try (JMSContext context = connectionFactory.createContext();){  
  4.       JMSConsumer consumer = session.createConsumer(queue);  
  5.       msg = consumer.receiveBody(String.class);  
  6.    } catch (JMSRuntimeException ex) {  
  7.       // handle exception  
  8.    }  
  9.    return msg;  
  10. }  
public String receiveMessage20(ConnectionFactory connectionFactory, Queue queue){ String msg = null; try (JMSContext context = connectionFactory.createContext();){ JMSConsumer consumer = session.createConsumer(queue); msg = consumer.receiveBody(String.class); } catch (JMSRuntimeException ex) { // handle exception } return msg; }

 接收消息的代碼互相對比,,簡化的部分和發(fā)送消息是類似的,。需要特別說明的是:

 

    1、1.1需要調(diào)用connection.start()把消息分發(fā)到消費(fèi)者,。2.0里連接會(huì)自動(dòng)啟動(dòng),。

    2、1.1接收的是Message對象,,需要顯式進(jìn)行類型轉(zhuǎn)換,,再進(jìn)一步獲取消息體,而且在1.1里,,針對不同的消息類型,,獲取消息體的方法也是各自特定的。2.0里統(tǒng)一調(diào)用receiveBody方法就可以直接返回消息體,,或者獲取消息后統(tǒng)一用getBody方法獲取消息體,。

異步接收消息

前面接收消息的例子是同步的,接收方法在接收到消息之前會(huì)一直阻塞,,除非超時(shí),。

如果想異步接收消息,需要給MessageConsumer或JMSConsumer對象設(shè)置MessageListener接口的實(shí)現(xiàn)類,,讓MessageListener去處理消息(onMessage方法),。

不同之處仍然是1.1需要顯式調(diào)用connection.start(),。

JMS 1.1

 

Java代碼  收藏代碼
  1. MessageConsumer messageConsumer = session.createConsumer(queue);  
  2. messageConsumer.setMessageListener(msgListener);  
  3. connection.start();  
MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(msgListener); connection.start();

 

JMS 2.0

 

Java代碼  收藏代碼
  1. JMSConsumer consumer = context.createConsumer(queue);  
  2. consumer.setMessageListener(messageListener);  
JMSConsumer consumer = context.createConsumer(queue); consumer.setMessageListener(messageListener);

 注意,上面的示例代碼是在Java客戶端代碼里的寫法,。如果要在Web應(yīng)用或EJB應(yīng)用里異步接收消息,,需要使用消息驅(qū)動(dòng)Bean。

 

將JMSContext注入Java EE應(yīng)用

編寫Web應(yīng)用或EJB應(yīng)用時(shí),,使用JMS 2.0簡化的API會(huì)更加簡單,。因?yàn)镴ava EE 7的應(yīng)用服務(wù)器會(huì)負(fù)責(zé)JMSContext的創(chuàng)建和關(guān)閉,開發(fā)人員只需要把它注入應(yīng)用就可以了,。

 

Java代碼  收藏代碼
  1. // @Inject注解告訴容器創(chuàng)建JMSContext  
  2. @Inject   
  3. // @JMSConnectionFactory注解告訴容器應(yīng)用使用的連接工廠的JNDI名  
  4. @JMSConnectionFactory('jms/connectionFactory')   
  5. private JMSContext context;  
  6.   
  7. @Resource(lookup='jms/queue')  
  8. private Queue queue;  
  9.   
  10. public void sendMessage(String msg) {  
  11.    context.send(queue, msg);  
  12. }  
// @Inject注解告訴容器創(chuàng)建JMSContext @Inject // @JMSConnectionFactory注解告訴容器應(yīng)用使用的連接工廠的JNDI名 @JMSConnectionFactory('jms/connectionFactory') private JMSContext context; @Resource(lookup='jms/queue') private Queue queue; public void sendMessage(String msg) { context.send(queue, msg); }

 

更簡單的資源配置

對JMS應(yīng)用來說,,需要使用連接工廠和目的地(隊(duì)列或主題)。連接工廠需要知道JMS提供者所在的主機(jī)名和監(jiān)聽端口,,應(yīng)用通過連接工廠創(chuàng)建到JMS提供者的連接,;隊(duì)列或主題則要知道作為消息端點(diǎn)的物理隊(duì)列或物理主題。由于每個(gè)JMS提供者創(chuàng)建,、配置它們的方式不盡相同,,所以JMS推薦單獨(dú)創(chuàng)建、配置,,然后綁定到JNDI樹上,,應(yīng)用使用時(shí)再通過JNDI查找、直接獲取連接工廠或目的地對象,。Java EE規(guī)范推薦把代碼和配置分開,,是為了保證應(yīng)用的可移植性,代碼不需要知道具體的物理細(xì)節(jié)(像主機(jī)名,、端口,、物理名稱等)。

但對于大規(guī)模集群上部署的應(yīng)用來說,,代碼和配置分離反而可能會(huì)增加額外的工作量,,尤其是在集群不能統(tǒng)一配置的情形下。而且在這樣的生產(chǎn)環(huán)境下,,應(yīng)用重部署和遷移的情況相對較少,,那能否在應(yīng)用部署的過程中就一起把應(yīng)用要使用的資源對象給創(chuàng)建出來、省去管理員單獨(dú)配置的環(huán)節(jié)呢,?

JMS 2.0簡化了JMS資源的配置,,就適用于前面說的情形。應(yīng)用開發(fā)人員可以在代碼里使用2.0新增的JMS資源定義注解,,也可以在部署描述符里定義,,也可以兩者結(jié)合。

JMS資源定義注解

JMS資源定義注解主要有javax.jms.JMSConnectionFactoryDefinition和javax.jms.JMSDestinationDefinition,可以直接定義在EJB或Servlet里,。

 

Java代碼  收藏代碼
  1. @JMSConnectionFactoryDefinition(  
  2.     name='java:global/jms/connectionFactory',  
  3.     maxPoolSize = 30,  
  4.     minPoolSize= 20,  
  5.     properties = {  
  6.         'addressList=mq://localhost:7676',  
  7.         'reconnectEnabled=true'  
  8.     }  
  9. )   
  10. @JMSDestinationDefinition(  
  11.     name = 'java:global/jms/queue',  
  12.     interfaceName = 'javax.jms.Queue',  
  13.     destinationName = 'queue'  
  14.   )  
  15. public class TestServlet extends HttpServlet {  
  16.   ...  
  17. }  
@JMSConnectionFactoryDefinition( name='java:global/jms/connectionFactory', maxPoolSize = 30, minPoolSize= 20, properties = { 'addressList=mq://localhost:7676', 'reconnectEnabled=true' } ) @JMSDestinationDefinition( name = 'java:global/jms/queue', interfaceName = 'javax.jms.Queue', destinationName = 'queue' ) public class TestServlet extends HttpServlet { ... }

 

如果需要定義多個(gè)連接工廠或目的地,,JMSConnectionFactoryDefinition和JMSDestinationDefinition可以分別包含在MSConnectionFactoryDefinitions和JMSDestinationDefinitions注解里。類似于EJBs和EJB,。

 

Java代碼  收藏代碼
  1. @JMSConnectionFactoryDefinitions({  
  2.     @JMSConnectionFactoryDefinition(  
  3.        name='java:global/jms/connectionFactory1',  
  4.        maxPoolSize = 30,  
  5.        minPoolSize= 20,         
  6.        properties = {  
  7.           'addressList=mq://localhost:7676',  
  8.           'reconnectEnabled=true'  
  9.        }  
  10.     ),  
  11.     @JMSConnectionFactoryDefinition(  
  12.        name='java:global/jms/connectionFactory2',  
  13.        maxPoolSize = 30,  
  14.        minPoolSize= 20,  
  15.        properties = {  
  16.           'addressList=mq://localhost:7677',  
  17.           'reconnectEnabled=true'  
  18.        }  
  19.     )   
  20. })  
  21. @JMSDestinationDefinitions({  
  22.     @JMSDestinationDefinition(  
  23.        name='java:global/jms/queue1',  
  24.        interfaceName = 'javax.jms.Queue',  
  25.        destinationName = 'queue1'  
  26.     ),  
  27.     @JMSDestinationDefinition(  
  28.        name='java:global/jms/queue2',  
  29.        interfaceName = 'javax.jms.Queue',  
  30.        destinationName = 'queue2'  
  31.     )   
  32. })  
  33. public class TestServlet extends HttpServlet {  
  34.   ...  
  35. }  
@JMSConnectionFactoryDefinitions({ @JMSConnectionFactoryDefinition( name='java:global/jms/connectionFactory1', maxPoolSize = 30, minPoolSize= 20, properties = { 'addressList=mq://localhost:7676', 'reconnectEnabled=true' } ), @JMSConnectionFactoryDefinition( name='java:global/jms/connectionFactory2', maxPoolSize = 30, minPoolSize= 20, properties = { 'addressList=mq://localhost:7677', 'reconnectEnabled=true' } ) }) @JMSDestinationDefinitions({ @JMSDestinationDefinition( name='java:global/jms/queue1', interfaceName = 'javax.jms.Queue', destinationName = 'queue1' ), @JMSDestinationDefinition( name='java:global/jms/queue2', interfaceName = 'javax.jms.Queue', destinationName = 'queue2' ) }) public class TestServlet extends HttpServlet { ... }

需要注意的是,使用JMS資源定義注解定義的連接工廠和目的地必須放在java:comp,、java:module,、java:app或者java:global名字空間里,生命周期和應(yīng)用保持一致,,即部署應(yīng)用時(shí)生成,,解部署應(yīng)用時(shí)就會(huì)刪除。

部署描述符

如果不想讓資源的配置侵入代碼,,或者開發(fā)的時(shí)候還不確定JMS提供者的真實(shí)信息(比如主機(jī)地址和端口),,那可以選擇在應(yīng)用的部署描述里定義連接工廠和目的地,比如web.xml或ejb-jar.xml,。

Xml代碼  收藏代碼
  1. <jms-connection-factory>  
  2.    <name>java:global/jms/connectionFactory</name>  
  3.    <max-pool-size>30</max-pool-size>  
  4.    <min-pool-size>20</min-pool-size>  
  5.    <property>  
  6.       <name>addressList</name>  
  7.       <value>mq://localhost:7676</value>  
  8.    </property>  
  9.    <property>  
  10.       <name>reconnectEnabled</name>  
  11.       <value>true</value>  
  12.    </property>      
  13. </jms-connection-factory>  
  14.   
  15. <jms-destination>  
  16.    <name>java:global/jms/queue</name>  
  17.    <interfaceName>javax.jms.Queue</interfaceName>  
  18.    <destinationName>queue</destinationName>   
  19. </jms-destination>  
<jms-connection-factory> <name>java:global/jms/connectionFactory</name> <max-pool-size>30</max-pool-size> <min-pool-size>20</min-pool-size> <property> <name>addressList</name> <value>mq://localhost:7676</value> </property> <property> <name>reconnectEnabled</name> <value>true</value> </property> </jms-connection-factory> <jms-destination> <name>java:global/jms/queue</name> <interfaceName>javax.jms.Queue</interfaceName> <destinationName>queue</destinationName> </jms-destination>

延遲發(fā)送

2.0里,,消息生產(chǎn)者可以指定一個(gè)延遲發(fā)送時(shí)間,JMS提供者等過了這么長時(shí)間之后再發(fā)送消息,。

這個(gè)功能具體體現(xiàn)在MessageProducer和JMSProducer的API里,。

MessageProducer

Java代碼  收藏代碼
  1. MessageProducer messageProducer = session.createProducer(queue);  
  2. messageProducer.setDeliveryDelay(20000); // 20000毫秒,發(fā)送消息之前設(shè)置  
  3. TextMessage textMessage = session.createTextMessage('Hello World!');  
  4. messageProducer.send(textMessage);  
MessageProducer messageProducer = session.createProducer(queue); messageProducer.setDeliveryDelay(20000); // 20000毫秒,,發(fā)送消息之前設(shè)置 TextMessage textMessage = session.createTextMessage('Hello World!'); messageProducer.send(textMessage);

JMSProducer

Java代碼  收藏代碼
  1. try (JMSContext context = connectionFactory.createContext();){  
  2.    context.createProducer().setDeliveryDelay(20000).send(queue, 'Hello World!'); // 20000毫秒,,也是在發(fā)送消息之前設(shè)置  
  3. }  
try (JMSContext context = connectionFactory.createContext();){ context.createProducer().setDeliveryDelay(20000).send(queue, 'Hello World!'); // 20000毫秒,也是在發(fā)送消息之前設(shè)置 }

異步發(fā)送消息

2.0里新的send方法允許應(yīng)用異步發(fā)送消息,。

傳統(tǒng)的消息發(fā)送方式是阻塞的,,消息安全發(fā)送到JMS提供者之后,發(fā)送方法才會(huì)返回,。如果消息是持久化的,,那中間的阻塞時(shí)間還要算上持久化花費(fèi)的時(shí)間(寫文件或?qū)憯?shù)據(jù)庫,或者其他的存儲(chǔ)介質(zhì)),。

異步發(fā)送在發(fā)送完消息之后并不會(huì)等待JMS提供者的應(yīng)答,,而是繼續(xù)做其它事情。JMS提供者收到消息,、做完持久化之后,,會(huì)回調(diào)應(yīng)用指定的CompletionListener的onCompletion方法,通知客戶端應(yīng)用消息發(fā)送的結(jié)果,。這樣的話,,客戶端應(yīng)用的執(zhí)行效率就可以大大提升。

新的JMSProducer和以前就有的MessageProducer都支持異步發(fā)送,。我們來看看具體的例子,。

CompletionListener

Java代碼  收藏代碼
  1. class SampleCompletionListener implements CompletionListener {  
  2.    CountDownLatch latch;  
  3.    Exception exception;  
  4.      
  5.    public SampleCompletionListener(CountDownLatch latch) {  
  6.       this.latch=latch;  
  7.    }  
  8.   
  9.    @Override  
  10.    public void onCompletion(Message message) {  
  11.       latch.countDown();  
  12.    }  
  13.   
  14.    @Override  
  15.    public void onException(Message message, Exception exception) {  
  16.       latch.countDown();  
  17.       this.exception = exception;  
  18.    }  
  19.   
  20.    public Exception getException(){  
  21.       return exception;  
  22.    }  
  23. }  
class SampleCompletionListener implements CompletionListener { CountDownLatch latch; Exception exception; public SampleCompletionListener(CountDownLatch latch) { this.latch=latch; } @Override public void onCompletion(Message message) { latch.countDown(); } @Override public void onException(Message message, Exception exception) { latch.countDown(); this.exception = exception; } public Exception getException(){ return exception; } }

MessageProducer

Java代碼  收藏代碼
  1. private void asyncSend11(ConnectionFactory connectionFactory, Queue queue) throws Exception {  
  2.    try (Connection connection = connectionFactory.createConnection();){  
  3.       Session session = connection.createSession();  
  4.       MessageProducer messageProducer = session.createProducer(queue);  
  5.       TextMessage textMessage = session.createTextMessage('Hello World!');  
  6.       CountDownLatch latch = new CountDownLatch(1);  
  7.       SampleCompletionListener completionListener = new SampleCompletionListener(latch);  
  8.       messageProducer.send(textMessage, completionListener);  
  9.       System.out.println('Waiting for a reply...');  
  10.   
  11.       // 做其他事情,,不讓應(yīng)用閑等  
  12.   
  13.       latch.await();  
  14.   
  15.       Exception exception = completionListener.getException();  
  16.       if (exception == null){  
  17.          System.out.println('Send message successfully.');  
  18.       } else {  
  19.          System.out.println('Failed to send message. '   exception.getMessage());  
  20.       }  
  21.    }  
  22. }  
private void asyncSend11(ConnectionFactory connectionFactory, Queue queue) throws Exception { try (Connection connection = connectionFactory.createConnection();){ Session session = connection.createSession(); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage('Hello World!'); CountDownLatch latch = new CountDownLatch(1); SampleCompletionListener completionListener = new SampleCompletionListener(latch); messageProducer.send(textMessage, completionListener); System.out.println('Waiting for a reply...'); // 做其他事情,不讓應(yīng)用閑等 latch.await(); Exception exception = completionListener.getException(); if (exception == null){ System.out.println('Send message successfully.'); } else { System.out.println('Failed to send message. ' exception.getMessage()); } } }

JMSProducer

Java代碼  收藏代碼
  1. private void asyncSend20(ConnectionFactory connectionFactory, Queue queue) throws Exception {  
  2.    try (JMSContext context = connectionFactory.createContext();){  
  3.       CountDownLatch latch = new CountDownLatch(1);  
  4.       SampleCompletionListener completionListener = new SampleCompletionListener(latch);  
  5.       context.createProducer().setAsync(completionListener).send(queue, 'Hello World!');  
  6.       System.out.println('Waiting for a reply...');  
  7.   
  8.       // 做其他事情,,不讓應(yīng)用閑等  
  9.   
  10.       latch.await();  
  11.   
  12.       Exception exception = completionListener.getException();  
  13.       if (exception == null){  
  14.          System.out.println('Send message successfully.');  
  15.       } else {  
  16.          System.out.println('Failed to send message. '   exception.getMessage());  
  17.       }  
  18.    }  
  19.  }  
private void asyncSend20(ConnectionFactory connectionFactory, Queue queue) throws Exception { try (JMSContext context = connectionFactory.createContext();){ CountDownLatch latch = new CountDownLatch(1); SampleCompletionListener completionListener = new SampleCompletionListener(latch); context.createProducer().setAsync(completionListener).send(queue, 'Hello World!'); System.out.println('Waiting for a reply...'); // 做其他事情,,不讓應(yīng)用閑等 latch.await(); Exception exception = completionListener.getException(); if (exception == null){ System.out.println('Send message successfully.'); } else { System.out.println('Failed to send message. ' exception.getMessage()); } } }

不過這個(gè)特性不適用于Java EE的Web容器和EJB容器,只可用于Java SE應(yīng)用,、或者是Java EE Application Client容器,。

多個(gè)消費(fèi)者共享同一個(gè)主題訂閱

在JMS 1.1里,主題訂閱和消費(fèi)者是一一對應(yīng)的,,也就是在同一時(shí)間,,一個(gè)主題訂閱只能有一個(gè)消費(fèi)者。

Java代碼  收藏代碼
  1. private void createConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {  
  2.    Connection connection = connectionFactory.createConnection();  
  3.    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  4.    MessageConsumer messageConsumer = session.createConsumer(topic);  
  5.    connection.start();  
  6.    Message message = messageConsumer.receive(10000);  
  7.    while (message != null) {  
  8.       System.out.println('Message received: '   ((TextMessage) message).getText());  
  9.       message = messageConsumer.receive(10000);  
  10.    }  
  11.    connection.close();  
  12. }  
private void createConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(topic); connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println('Message received: ' ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }

如果我們想調(diào)用createConsumer創(chuàng)建多個(gè)消費(fèi)者,,那每個(gè)消費(fèi)者各有一個(gè)訂閱,,每個(gè)消費(fèi)者都會(huì)從主題接收到一份消息拷貝。舉例來說,,有兩個(gè)消費(fèi)者A和B,,主題topic上有三條消息x、y,、z,,那A會(huì)拿到x、y,、z的一份拷貝,,B也會(huì)拿到x、y,、z的一份拷貝,。要是想A和B能合起來消費(fèi)x、y,、z,,即A消費(fèi)x、z,,B消費(fèi)y,,那只能是A和B共享同一個(gè)訂閱了,可惜1.1并不支持共享的訂閱,。

2.0里可以指定訂閱是“可共享的”,,不論訂閱是持久的還是非持久的,而且Session和JMSContext都支持這個(gè)特性,。這樣的話,,消息的處理就可以由多線程、或者多個(gè)不同的連接、甚至多個(gè)Java進(jìn)程同時(shí)進(jìn)行,,顯而易見,,這很利于提升應(yīng)用的可伸縮性。

Java代碼  收藏代碼
  1. private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {  
  2.    Connection connection = connectionFactory.createConnection();  
  3.    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  4.    MessageConsumer messageConsumer = session.createSharedConsumer(topic, 'ourSubscription'); // 需要指定共享訂閱的名稱,,以便多個(gè)消費(fèi)者能確定彼此共享的訂閱  
  5.    connection.start();  
  6.    Message message = messageConsumer.receive(10000);  
  7.    while (message != null) {  
  8.       System.out.println('Message received: '   ((TextMessage) message).getText());  
  9.       message = messageConsumer.receive(10000);  
  10.    }  
  11.    connection.close();  
  12. }  
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createSharedConsumer(topic, 'ourSubscription'); // 需要指定共享訂閱的名稱,,以便多個(gè)消費(fèi)者能確定彼此共享的訂閱 connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println('Message received: ' ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }

前面的例子創(chuàng)建的是非持久的訂閱,同樣的,,持久訂閱在2.0里也是可以共享的,,而且Session和JMSContext也都支持。

Java代碼  收藏代碼
  1. MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, 'ourDurableSubscription');  
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic, 'ourDurableSubscription');

當(dāng)然,,2.0依舊支持不共享的訂閱。  

JMS提供者必須設(shè)置消息屬性JMSXDeliveryCount

JMS消息由三部分組成:

  • 消息頭:消息頭是所有消息都支持的,,客戶端和JMS提供者都能使用,,用于消息的區(qū)分和路由。
  • 屬性:消息頭的補(bǔ)充,。其中一部分是JMS定義的標(biāo)準(zhǔn)屬性(JMSX開頭),,其他的可以是應(yīng)用特定的,也可以是JMS提供者特定的,。
  • 消息體

標(biāo)準(zhǔn)屬性里,,有一個(gè)JMSXDeliveryCount(int類型),它由JMS提供者設(shè)置,,表示JMS提供者給消費(fèi)者發(fā)送消息的嘗試次數(shù),。第一次發(fā)送消息的時(shí)候,該屬性的值設(shè)置為1,,然后每嘗試發(fā)送一次,,值就加一。如果該屬性的值為N,,那就說明前N-1次都發(fā)送失敗了,。消費(fèi)者如果不做特殊的處理,那會(huì)一直重復(fù)發(fā)送,。

消費(fèi)者可以根據(jù)這個(gè)值確認(rèn)消息是否被重復(fù)發(fā)送了,,進(jìn)而可以進(jìn)行特殊的處理,比如把這條消息放入死信隊(duì)列等,。

JMSXDeliveryCount在1.1里就有,,只是是可選的,2.0里則變成了必需的,。下面是一個(gè)借助JMSXDeliveryCount屬性的值進(jìn)行不同處理的例子:

Java代碼  收藏代碼
  1. class SampleMessageListener implements MessageListener {  
  2.    @Override  
  3.    public void onMessage(Message message) {  
  4.       try {  
  5.          int deliveryCount = message.getIntProperty('JMSXDeliveryCount');  
  6.          if (deliveryCount < 10){  
  7.              // 故意拋出運(yùn)行時(shí)一場,,模擬消息處理失敗的情形,使得JMS提供者能重發(fā)消息  
  8.              throw new RuntimeException('Exception thrown to simulate a bad message');  
  9.          } else {  
  10.              // 消息已經(jīng)被發(fā)送了10次,放棄重發(fā),,進(jìn)行其他的處理  
  11.          }  
  12.       } catch (JMSException e) {  
  13.          throw new RuntimeException(e);  
  14.       }  
  15.    }  
  16. }  
class SampleMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { int deliveryCount = message.getIntProperty('JMSXDeliveryCount'); if (deliveryCount < 10){ // 故意拋出運(yùn)行時(shí)一場,,模擬消息處理失敗的情形,使得JMS提供者能重發(fā)消息 throw new RuntimeException('Exception thrown to simulate a bad message'); } else { // 消息已經(jīng)被發(fā)送了10次,,放棄重發(fā),,進(jìn)行其他的處理 } } catch (JMSException e) { throw new RuntimeException(e); } } }

 

JMS 2.0主要的新特性就介紹完了,其實(shí)在細(xì)節(jié)上還有不少變化,,大家可以參考JMS 2.0規(guī)范和API文檔理解更多內(nèi)容,。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請遵守用戶 評(píng)論公約

    類似文章 更多