關(guān)鍵字: activemq
2.5 Clustering ActiveMQ從多種不同的方面提供了集群的支持,。 2.5.1 Queue consumer clusters ActiveMQ支
持訂閱同一個(gè)queue的consumers上的集群。如果一個(gè)consumer失效,,那么所有未被確認(rèn)(unacknowledged)的消息都會(huì)被發(fā)
送到這個(gè)queue上其它的consumers。如果某個(gè)consumer的處理速度比其它c(diǎn)onsumers更快,,那么這個(gè)consumer就會(huì)消費(fèi)更
多的消息,。 需要注意的是,筆者發(fā)現(xiàn)AcitveMQ5.0版本的Queue consumer clusters存在一個(gè)bug:采用AMQ Message Store,,運(yùn)行一個(gè)producer,,兩個(gè)consumer,并采用如下的配置文件:
- <beans>
- <broker xmlns="http:///config/1.0" brokerName="BugBroker1" useJmx="true">
-
- <transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
- </transportConnectors>
-
- <persistenceAdapter>
- <amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
- </persistenceAdapter>
-
- </broker>
- </beans>
<beans>
<broker xmlns="http:///config/1.0" brokerName="BugBroker1" useJmx="true">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<persistenceAdapter>
<amqPersistenceAdapter directory="activemq-data/BugBroker1" maxFileLength="32mb"/>
</persistenceAdapter>
</broker>
</beans>
那么經(jīng)過(guò)一段時(shí)間后可能會(huì)報(bào)出如下錯(cuò)誤: ERROR [ActiveMQ
Transport: tcp:///127.0.0.1:1843 - RecoveryListenerAdapter.java:58 -
RecoveryListenerAdapter] Message id
ID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from
the data store! Apache官方文檔說(shuō),,此bug已經(jīng)被修正,,預(yù)定在5.1.0版本上體現(xiàn),。
2.5.2 Broker clusters 一個(gè)常見(jiàn)的場(chǎng)景是有多個(gè)JMS broker,有一個(gè)客戶連接到其中一個(gè)broker,。如果這個(gè)broker失效,,那么客戶會(huì)自動(dòng)重新連接到其它的broker。在ActiveMQ中使用failover:// 協(xié)議來(lái)實(shí)現(xiàn)這個(gè)功能,。ActiveMQ3.x版本的reliable://協(xié)議已經(jīng)變更為failover://,。
如果某個(gè)網(wǎng)絡(luò)上有多個(gè)brokers而且客戶使用靜態(tài)發(fā)現(xiàn)(使用Static Transport或Failover
Transport)或動(dòng)態(tài)發(fā)現(xiàn)(使用Discovery
Transport),那么客戶可以容易地在某個(gè)broker失效的情況下切換到其它的brokers,。然而,,stand alone
brokers并不了解其它brokers上的consumers,也就是說(shuō)如果某個(gè)broker上沒(méi)有consumers,,那么這個(gè)broker上的消
息可能會(huì)因得不到處理而積壓起來(lái),。目前的解決方案是使用Network of brokers,以便在broker之間存儲(chǔ)轉(zhuǎn)發(fā)消息,。ActiveMQ在未來(lái)會(huì)有更好的特性,,用來(lái)在客戶端處理這個(gè)問(wèn)題。 從ActiveMQ1.1版本起,,ActiveMQ支
持networks of
brokers,。它支持分布式的queues和topics。一個(gè)broker會(huì)相同對(duì)待所有的訂閱(subscription):不管他們是來(lái)自本地的
客戶連接,,還是來(lái)自遠(yuǎn)程broker,,它都會(huì)遞送有關(guān)的消息拷貝到每個(gè)訂閱。遠(yuǎn)程broker得到這個(gè)消息拷貝后,,會(huì)依次把它遞送到其內(nèi)部的本地連接上,。
有兩種方式配置Network of brokers,一種是使用static transport,,如下:
- <broker brokerName="receiver" persistent="false" useJmx="false">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:62002"/>
- </transportConnectors>
- <networkConnectors>
- <networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
- </networkConnectors>
- …
- </broker>
<broker brokerName="receiver" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
</networkConnectors>
…
</broker>
另外一種是使用multicast discovery,,如下:
- <broker name="sender" persistent="false" useJmx="false">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
- </transportConnectors>
- <networkConnectors>
- <networkConnector uri="multicast://default"/>
- </networkConnectors>
- ...
- </broker>
<broker name="sender" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
...
</broker>
Network Connector有以下屬性:
Property |
Default Value |
Description |
name |
bridge |
name of the network - for more than one network connector between the same two brokers - use different names |
dynamicOnly |
false |
if true, only forward messages if a consumer is active on the connected broker |
decreaseNetworkConsumerPriority |
false |
decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer |
networkTTL |
1 |
the number of brokers in the network that messages and subscriptions can pass through |
conduitSubscriptions |
true |
multiple consumers subscribing to the same destination are treated as one consumer by the network |
excludedDestinations |
empty |
destinations matching this list won't be forwarded across the network |
dynamicallyIncludedDestinations |
empty |
destinations
that match this list will be forwarded across the network n.b. an empty
list means all destinations not in the excluded list will be forwarded |
staticallyIncludedDestinations |
empty |
destinations that match will always be passed across the network - even if no consumers have ever registered an interest |
duplex |
false |
if
true, a network connection will be used to both produce AND Consume
messages. This is useful for hub and spoke scenarios when the hub is
behind a firewall etc.
|
關(guān)于conduitSubscriptions屬性,這里稍稍說(shuō)明一下,。設(shè)想有兩個(gè)brokers,,分別是brokerA和brokerB,它們之間用
forwarding
bridge連接,。有一個(gè)consumer連接到brokerA并訂閱queue:Q.TEST,。有兩個(gè)consumers連接到brokerB,也是訂
閱queue:Q.TEST,。這三個(gè)consumers有相同的優(yōu)先級(jí),。然后啟動(dòng)一個(gè)producer,它發(fā)送了30條消息到brokerA。如果
conduitSubscriptions=true,,那么brokerA上的consumer會(huì)得到15條消息,,
另外15條消息會(huì)發(fā)送給brokerB。此時(shí)負(fù)載并不均衡,,因?yàn)榇藭r(shí)brokerA將brokerB上的兩個(gè)consumers視為一個(gè),;如果
conduitSubscriptions=false,那么每個(gè)consumer上都會(huì)收到10條消息,。以下是關(guān)于NetworkConnector屬
性的一個(gè)例子:
- <networkConnectors>
- <networkConnector uri="static://(tcp://localhost:61617)"
- name="bridge" dynamicOnly="false" conduitSubscriptions="true"
- decreaseNetworkConsumerPriority="false">
- <excludedDestinations>
- <queue physicalName="exclude.test.foo"/>
- <topic physicalName="exclude.test.bar"/>
- </excludedDestinations>
- <dynamicallyIncludedDestinations>
- <queue physicalName="include.test.foo"/>
- <topic physicalName="include.test.bar"/>
- </dynamicallyIncludedDestinations>
- <staticallyIncludedDestinations>
- <queue physicalName="always.include.queue"/>
- <topic physicalName="always.include.topic"/>
- </staticallyIncludedDestinations>
- </networkConnector>
- </networkConnectors>
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)"
name="bridge" dynamicOnly="false" conduitSubscriptions="true"
decreaseNetworkConsumerPriority="false">
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<staticallyIncludedDestinations>
<queue physicalName="always.include.queue"/>
<topic physicalName="always.include.topic"/>
</staticallyIncludedDestinations>
</networkConnector>
</networkConnectors>
2.5.3 Master Slave
在一個(gè)網(wǎng)絡(luò)內(nèi)運(yùn)行多個(gè)brokers或者stand alone
brokers時(shí)存在一個(gè)問(wèn)題,,這就是消息在物理上只被一個(gè)broker持有,因此當(dāng)某個(gè)broker失效,,那么你只能等待直到它重啟后,,這個(gè)
broker上的消息才能夠被繼續(xù)發(fā)送(如果沒(méi)有設(shè)置持久化,那么在這種情況下,,消息將會(huì)丟失),。Master Slave
背后的想法是,消息被復(fù)制到slave broker,,因此即使master broker遇到了像硬件故障之類(lèi)的錯(cuò)誤,,你也可以立即切換到slave
broker而不丟失任何消息。 Master Slave是目前ActiveMQ推薦的高可靠性和容錯(cuò)的解決方案,。以下是幾種不同的類(lèi)型:
Master Slave Type |
Requirements |
Pros |
Cons |
Pure Master Slave |
None |
No central point of failure |
Requires manual restart to bring back a failed master and can only support 1 slave |
Shared File System Master Slave |
A Shared File system such as a SAN |
Run as many slaves as required. Automatic recovery of old masters |
Requires shared file system |
JDBC Master Slave |
A Shared database |
Run as many slaves as required. Automatic recovery of old masters |
Requires a shared database. Also relatively slow as it cannot use the high performance journal |
2.5.3.1 Pure Master Slave Pure Master Slave的工作方式如下:
- Slave
broker消費(fèi)master broker上所有的消息狀態(tài),,例如消息、確認(rèn)和事務(wù)狀態(tài)等,。只要slave broker連接到了master
broker,,它不會(huì)(也不被允許)啟動(dòng)任何network connectors或者transport
connectors,所以唯一的目的就是復(fù)制master broker的狀態(tài),。
- Master broker只有在消息成功被復(fù)制到slave broker之后才會(huì)響應(yīng)客戶,。例如,客戶的commit請(qǐng)求只有在master broker和slave broker都處理完畢commit請(qǐng)求之后才會(huì)結(jié)束,。
- 當(dāng)master
broker失效的時(shí)候,,slave broker有兩種選擇,一種是slave broker啟動(dòng)所有的network
connectors和transport connectors,,這允許客戶端切換到slave broker,;另外一種是slave
broker停止。這種情況下,,slave broker只是復(fù)制了master broker的狀態(tài),。
- 客戶應(yīng)該使用failover transport并且應(yīng)該首先嘗試連接master broker。例如:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false 設(shè)置randomize為false就可以讓客戶總是首先嘗試連接master broker(slave broker并不會(huì)接受任何連接,,直到它成為了master broker),。
Pure Master Slave具有以下限制:
- 只能有一個(gè)slave broker連接到master broker。
- 在因master broker失效而導(dǎo)致slave broker成為master之后,,之前的master broker只有在當(dāng)前的master broker(原slave broker)停止后才能重新生效,。
- Master
broker失效后而切換到slave broker后,最安全的恢復(fù)master broker的方式是人工處理,。首先要停止slave
broker(這意味著所有的客戶也要停止),。然后把slave broker的數(shù)據(jù)目錄中所有的數(shù)據(jù)拷貝到master
broker的數(shù)據(jù)目錄中。然后重啟master broker和slave broker,。
Master broker不需要特殊的配置,。Slave broker需要進(jìn)行以下配置
- <broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">
- ...
- <transportConnectors>
- <transportConnector uri="tcp://slavehost:61616"/>
- </transportConnectors>
- </broker>
<broker masterConnectorURI="tcp://masterhost:62001" shutdownOnMasterFailure="false">
...
<transportConnectors>
<transportConnector uri="tcp://slavehost:61616"/>
</transportConnectors>
</broker>
其中的masterConnectorURI用于指向master broker,shutdownOnMasterFailure用于指定slave broker在master broker失效的時(shí)候是否需要停止,。此外,,也可以使用如下配置:
- <broker brokerName="slave" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http:///config/1.0">
- ...
- <services>
- <masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/>
- </services>
- </broker>
<broker brokerName="slave" useJmx="false" deleteAllMessagesOnStartup="true" xmlns="http:///config/1.0">
...
<services>
<masterConnector remoteURI= "tcp://localhost:62001" userName="user" password="password"/>
</services>
</broker>
需要注意的是,筆者認(rèn)為ActiveMQ5.0版本的Pure Master Slave仍然不夠穩(wěn)定,。
2.5.3.2 Shared File System Master Slave
如果你使用SAN或者共享文件系統(tǒng),,那么你可以使用Shared File System Master
Slave?;旧?,你可以運(yùn)行多個(gè)broker,這些broker共享數(shù)據(jù)目錄,。當(dāng)?shù)谝粋€(gè)broker得到文件上的排他鎖之后,,其它的broker便會(huì)
在循環(huán)中等待獲得這把鎖??蛻舳耸褂胒ailover transport來(lái)連接到可用的broker,。當(dāng)master
broker失效的時(shí)候會(huì)釋放這把鎖,這時(shí)候其中一個(gè)slave broker會(huì)得到這把鎖從而成為master broker,。以下是ActiveMQ配置的一個(gè)例子:
- <broker useJmx="false" xmlns="http:///config/1.0">
- <persistenceAdapter>
- <journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
- </persistenceAdapter>
- …
- </broker>
<broker useJmx="false" xmlns="http:///config/1.0">
<persistenceAdapter>
<journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
</persistenceAdapter>
…
</broker>
2.5.3.3 JDBC Master Slave JDBC Master Slave的工作原理跟Shared File System Master Slave類(lèi)似,,只是采用了數(shù)據(jù)庫(kù)作為持久化存儲(chǔ)。以下是ActiveMQ配置的一個(gè)例子:
- <beans>
- <broker xmlns="http:///config/1.0" brokerName="JdbcMasterBroker">
- ...
- <persistenceAdapter>
- <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
- </persistenceAdapter>
-
- </broker>
-
- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
- <property name="username" value="username"/>
- <property name="password" value="passward"/>
- <property name="poolPreparedStatements" value="true"/>
- </bean>
- </beans>
<beans>
<broker xmlns="http:///config/1.0" brokerName="JdbcMasterBroker">
...
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>
<property name="username" value="username"/>
<property name="password" value="passward"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
需要注意的是,,如果你使用MySQL數(shù)據(jù)庫(kù),,需要首先執(zhí)行以下三條語(yǔ)句:(Apache官方文檔說(shuō),此bug已經(jīng)被修正,,預(yù)定在5.1.0版本上體現(xiàn))
- ALTER TABLE activemq_acks ENGINE = InnoDB;
- ALTER TABLE activemq_lock ENGINE = InnoDB;
- ALTER TABLE activemq_msgs ENGINE = InnoDB;
|