本文使用的是CXF 2.7.3版本,。轉(zhuǎn)貼請(qǐng)注明出處為ITEYE?。,。。,。,。?/p>
1.什么是WS-N
WS-N全稱是WebService Notification,,屬于WS-*標(biāo)準(zhǔn)中的一個(gè),。
該標(biāo)準(zhǔn)主要由IBM等提出(微軟等提出的是WS-Eventing),,主要用于Publish/Subscribe方式的"Notifications"或者 "Events"通知,而publish 方和 subscription方不需要事先知道對(duì)端的webservice服務(wù)地址,。
WS-N有兩種實(shí)現(xiàn):
a.WebService base Notification : 主要用于點(diǎn)對(duì)點(diǎn)
b.WebService Broker Notification : 完整的Publish/Subscribe機(jī)制的實(shí)現(xiàn),,需要一個(gè)Broker
該標(biāo)準(zhǔn)已經(jīng)被OASIS定為國際標(biāo)準(zhǔn)。
2.CXF對(duì)WS-N的實(shí)現(xiàn)
CXF號(hào)稱是支持WS-N了,,實(shí)際上僅僅是實(shí)現(xiàn)了WS-N(Broker方式,,當(dāng)然也支持了PullPoint這種客戶端去拉消息的方式),并沒有將WebService Base Notification也一并實(shí)現(xiàn),。
3.使用CXF WS-N進(jìn)行開發(fā)
準(zhǔn)備條件:需要一個(gè)JMS消息服務(wù)器,,可以使用ActiveMQ或者JBOSS MQ(本文使用ActiveMQ)
A.啟動(dòng)ActiveMQ消息服務(wù)器
去ActiveMQ的bin目錄,運(yùn)行activemq.bat
【注意】
如果是輕量級(jí)的APP,,不需要單獨(dú)啟動(dòng)ACTIVEMQ服務(wù),。
直接在JVM里面啟動(dòng)消息服務(wù)器,使用vm的embed(內(nèi)置)的ActiveMQJMS服務(wù)器,,而且不需要持久化JMS 消息,。
內(nèi)置的JMS服務(wù),有兩種創(chuàng)建方式:
(1)使用Srping創(chuàng)建embed的jms broker
- <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
- <property name="config"
- value="classpath:activemq/activemq.xml" />
- <property name="persistent" value="false"></property>
- <property name="start" value="true" />
- <property name="brokerName" value="test"></property>
- </bean>
再使用 activeMQConnectionFactory連接:
- <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <constructor-arg>
-
- <value>vm://test</value>
- </constructor-arg>
- </bean>
(2)使用activeMQConnectionFactory直接進(jìn)行embed jms broker創(chuàng)建
- <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <constructor-arg>
- <value>vm:(broker:(tcp://localhost:19999)?persistent=false)</value>
- </constructor-arg>
- </bean>
【注意】
如
果程序中啟動(dòng)了多個(gè)不同名字的VM broker,,那么可能會(huì)有如下警告:Failed to start jmx connector: Cannot
bind to URL [rmi://localhost:1099/jmxrmi]:
javax.naming.NameAlreadyBoundException…可以通過在transportOptions中追加
broker.useJmx=false來禁用JMX來避免這個(gè)警告,。
由于編碼的問題,如果通過URL的Query方式設(shè)置多個(gè)參數(shù)的話,,需要使用"&"來代替"&"
例如:
- <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <constructor-arg>
- <value>vm:(broker:(tcp://localhost:19999)?persistent=false&useJmx=false)</value>
- </bean>
【注意2】
如果不需要發(fā)布TCP的訪問端口,甚至可以這樣:
那么現(xiàn)在就成了徹底的內(nèi)置JMS服務(wù)了,,需要將ConnectionFactory注入到業(yè)務(wù)類才能訪問了
- <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <constructor-arg>
- <value>vm://localhost?broker.useJmx=false&broker.persistent=false</value>
- </constructor-arg>
- </bean>
B.配置應(yīng)用與ActiveMQ消息服務(wù)器的連接
這里使用了Spring進(jìn)行管理,,Spring配置:
- <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <constructor-arg>
- <value>tcp:
- </constructor-arg>
- </bean>
-
- <bean id="singleConnectionFactory"
- class="org.springframework.jms.connection.SingleConnectionFactory">
- <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
- </bean>
C.配置Broker
配置Broker需要指定Broker的名稱(構(gòu)造函數(shù)第一個(gè)參數(shù)),注入ConnectionFactory(上面配置的)
還需要指定Broker地址
- <bean id="notificationBroker" class="org.apache.cxf.wsn.servicesJmsNotificationBroker"
- init-method="init">
- <constructor-arg index="0" type="java.lang.String" name="name" >
- <value>WSNotificationBroker</value>
- </constructor-arg>
- <constructor-arg index="1" name="connectionFactory" >
- <ref bean="activeMQConnectionFactory"/>
- </constructor-arg>
- <property name="address"
- value="http://localhost:19800/wsn/NotificationBroker"></property>
-
- </bean>
D.注冊(cè)Consumer
(1)需要先有一個(gè)Client端的NotificationBroker代理對(duì)象
當(dāng)然您也可以在代碼里面new出來(該對(duì)象的意思是代理了真正的Broker,,該clientNotificationBroker的地址需要和真正的Broker發(fā)布地址一樣,,從而指向真正的Broker)
- <bean id="clientNotificationBroker" class="org.apache.cxf.wsn.client.NotificationBroker">
- <constructor-arg index="0" type="java.lang.String" name="name" >
- <value>http:
- </constructor-arg>
- </bean>
(2)將這個(gè)對(duì)象注入到您的業(yè)務(wù)類里面
(3)發(fā)布生成一個(gè)Consumer對(duì)象,包括對(duì)應(yīng)的CallBack,。
Callback類:
- public class TestConsumer implements Consumer.Callback {
-
- final List<NotificationMessageHolderType> notifications
- = new ArrayList<NotificationMessageHolderType>();
-
- public void notify(NotificationMessageHolderType message) {
- synchronized (notifications) {
- notifications.add(message);
- notifications.notify();
- }
- }
- }
生成Consumer
- TestConsumer callback = new TestConsumer();
- Consumer consumer = new Consumer(callback, "<span style="font-size: 14px;">http://localhost:19800/wsn/NotificationBroker</span><span style="font-size: 14px;">");</span>
當(dāng)然也可以采用Spring生成:
- <bean id="callback" class="org.apache.cxf.wsn.client.Consumer.Callback">
- </bean>
-
- <bean id="consumer" class="org.apache.cxf.wsn.client.Consumer">
- <constructor-arg index="0"name="callback" >
- <ref bean="callback"/>
- </constructor-arg>
- <constructor-arg index="1"name="address" >
- <value>http://localhost:19100/test/consumer</value>
- </constructor-arg>
- <constructor-arg index="2"type="java.lang.Class" name="extraClasses" >
- <value>org.apache.cxf.wsn.type.CustomType</value>
- </constructor-arg>
- </bean>
(4)注冊(cè)Consumer
注冊(cè)不建議用Spring注入,,因?yàn)榭赡軐?duì)端服務(wù)沒啟動(dòng),會(huì)調(diào)用失敗,。主要有失敗中心注冊(cè)的處理機(jī)制,。
- <span style="font-size: 14px;">clientNotificationBroker</span>.subscribe(consumer, "myTopic");
D.注冊(cè)Publisher
類似上面的Consumer,我們這里只給出代碼形式的注冊(cè)Publser了,,Spring方式參考上面的Consumer
需要注意的是,,注冊(cè)publisher之后返回的registration可以用來銷毀注冊(cè),需要保存下來
另外真正調(diào)用Notify不是通過publisher調(diào)用,,而是上文提到的Client端的Broker調(diào)用其Notify方法,。
- public class PublisherCallback implements Publisher.Callback {
- final CountDownLatch subscribed = new CountDownLatch(1);
- final CountDownLatch unsubscribed = new CountDownLatch(1);
-
- public void subscribe(TopicExpressionType topic) {
- subscribed.countDown();
- }
-
- public void unsubscribe(TopicExpressionType topic) {
- unsubscribed.countDown();
- }
- }
- PublisherCallback publisherCallback = new PublisherCallback();
- Publisher publisher = new Publisher(publisherCallback, "http://localhost:19000/test/publisher");
- Registration registration = <span style="font-size: 14px;">clientNotificationBroker</span><span style="font-size: 14px;">.registerPublisher(publisher, "myTopic");</span>
E.發(fā)送Notify
上文提到了一個(gè)Client Broker:
- <bean id="clientNotificationBroker" class="org.apache.cxf.wsn.client.NotificationBroker"
- </bean>
使用它注入到您需要發(fā)送Notify的業(yè)務(wù)類,,調(diào)用:
- <span style="font-size: 14px;">clientNotificationBroker</span>.notify(publisher, "myTopic", new CustomType(1, 2)
【OVER】
這樣客戶端就能收到Publish的消息了。
其實(shí)整體的架構(gòu)是這樣子的:
|