大家好,我是君哥,。 我們在做消息隊列的技術選型時,,往往會結合業(yè)務場景進行考慮。今天來聊一聊消息隊列可能會用到的 7 種消息場景,。 1 普通消息消息隊列最基礎的功能就是生產者發(fā)送消息,、Broker 保存消息,消費者來消費消息,,以此實現(xiàn)系統(tǒng)解耦,、削峰填谷的作用。 普通消息是消息隊列必備的消息類型,,也是系統(tǒng)使用場景最多的一種消息,。 2 順序消息順序消息是指生產者發(fā)送消息的順序和消費者消費消息的順序是一致的,。比如在一個電商場景,同一個用戶提交訂單,、訂單支付,、訂單出庫,這三個消息消費者需要按照順序來進行消費,。如下圖: 順序消息的實現(xiàn)并不容易,,原因如下:
要保證消息有序,需要滿足兩個條件:
如下圖: 上面第二個條件是比較容易實現(xiàn)的,一個分區(qū)綁定一個消費者就可以,,主要是第一個條件,。 在主流消息隊列的實現(xiàn)中,Kafka 和 Pulsar 的實現(xiàn)方式類似,,生產者給消息賦值一個 key,,對 key 做 Hash 運算來指定消息發(fā)送到哪一個分區(qū)。比如上面電商的例子,,對同一個用戶的一筆訂單,,提交訂單,、訂單支付,、訂單出庫這三個消息賦值同一個 key,就可以把這三條消息發(fā)送到同一個分區(qū),。 對于 RocketMQ,,生產者在發(fā)送消息的時候,可以通過 MessageQueueSelector 指定把消息投遞到那個 MessageQueue,,如下圖: 示例代碼如下: public static void main(String[] args) throws UnsupportedEncodingException { RabbitMQ 的實現(xiàn)是 Exchange 根據設置好的 Route Key 將數(shù)據路由到不同的 Queue 中,。示例代碼如下:
3 延時消息或者也叫定時消息,是指消息發(fā)送后不會立即被消費,,而是指定一個時間,,到時間后再消費,。經典的場景比如電商購物時,30 分鐘未支付訂單,,讓訂單自動失效,。 3.1 RocketMQ 實現(xiàn)RocketMQ 定義了 18 個延時級別,每個延時級別對應一個延時時間,。下面如果延遲級別是 3,,則消息會延遲 10s 才會拉取。 //MessageStoreConfig類 RocketMQ 的延時消息如下圖: 生產者把消費發(fā)送到 Broker 后,,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,,然后調度任務會判斷是否到期,如果到期,,會把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,,這樣消費者就可以消費到了。 RocketMQ 的延時消息只支持最大兩個小時的延時,,不過 RocketMQ5.0 基于時間輪算法實現(xiàn)了定時消息,,解決了這個問題。 3.2 Pulsar 實現(xiàn)Pulsar 的實現(xiàn)如下圖: Pulsar 的延時消息首先會寫入一個 Delayed Message Tracker 的數(shù)據結構中,,Delayed Message Tracker 根據延時時間構建 delayed index 優(yōu)先級隊列,。消費者拉取消息時,首先去 Delayed Message Tracker 檢查是否有到期的消息,。如果有則直接拉取進行消費,。 3.3 RabbitMQ 實現(xiàn)RabbitMQ 的實現(xiàn)方式有兩種,一種是投遞到普通隊列都不消費,,等消息過期后被投遞到死信隊列,,消費者消費死信隊列。如下圖: 第二種方式是生產者發(fā)送消息時,,先發(fā)送到本地 Mnesia 數(shù)據庫,,消息到期后定時器再將消息投遞到 broker。 3.4 Kafka 實現(xiàn)Kafka 本身并沒有延時隊列,,不過可以通過生產者攔截器來實現(xiàn)消息延時發(fā)送,,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實現(xiàn)延時消息,。 4 事務消息事務消息是指生產消息和消費消息滿足事務的特性,。 RabbitMQ 和 Kafka 的事務消息都是只支持生產消息的事務特性,即一批消息要不全部發(fā)送成功,,要不全部發(fā)送失敗,。 RabbitMQ 通過 Channel 來開啟事務消息,代碼如下:
Kafka 可以給多個生產者設置同一個事務 ID ,從而把多個 Topic ,、多個 Partition 放在一個事務中,,實現(xiàn)原子性寫入。 Pulsar 的事務消息對于事務語義的定義是:允許事件流應用將消費,、處理,、生產消息整個過程定義為一個原子操作??梢?,Pulsar 的事務消息可以覆蓋消息流整個過程。 RocketMQ 的事務消息是通過 half 消息來實現(xiàn)的,。以電商購物場景來看,,賬戶服務扣減賬戶金額后,發(fā)送消息給 Broker,,庫存服務來消費這條消息進行扣減庫存,。如下圖: 可見,RocketMQ 只能保證生產者發(fā)送消息和本地事務的原子性,,并不能保證消費消息的原子性,。 5 軌跡消息軌跡消息主要用于跟蹤消息的生命周期,當消息丟失時可以很方便地找出原因,。 軌跡消息也跟普通消息一樣,,也需要存儲和查詢,也會占用消息隊列的資源,,所以選擇軌跡消息要考慮下面幾點:
RabbitMQ Broker 實現(xiàn)了軌跡消息的功能,打開 Trace 開關,,就可以把軌跡消息發(fā)送到 amq.rabbitmq.trace 這個 exchange,,但是要考慮軌跡消息會不會給 Broker 造成 壓力進而導致消息積壓。RabbitMQ 的生產者和消費者都沒有實現(xiàn)軌跡消息,,需要開發(fā)者自己來實現(xiàn),。 RocketMQ 生產者、Broker 和消費者都實現(xiàn)了軌跡消息,,不過默認是關閉的,,需要手工開啟,。 使用軌跡消息,,需要考慮記錄哪些節(jié)點、存儲介質、性能,、查詢方式等問題,。 6 死信隊列在消息隊列中,死信隊列主要應對一些異常的情況,,如下圖: RocketMQ 實現(xiàn)了消費端的死信隊列,,當消費者消費失敗時,會進行重試,,如果重試 16 次還是失敗,,則這條消息會被發(fā)送到死信隊列。 RabbitMQ 實現(xiàn)了生產者和 Broker 的死信隊列,,下面三種情況,,消息會被發(fā)送到死信隊列:
RabbitMQ 消息變成死信消息后,,會被發(fā)送到死信交換機(Dead-Letter-Exchange),。 7 優(yōu)先級消息有一些業(yè)務場景下,我們需要優(yōu)先處理一些消息,,比如銀行里面的金卡客戶,、銀卡客戶優(yōu)先級高于普通客戶,他們的業(yè)務需要優(yōu)先處理,。如下圖: 主流消息隊列中,,RabbitMQ 是支持優(yōu)先級隊列的,代碼如下: ConnectionFactory factory=new ConnectionFactory(); 8 總結消息隊列技術選型,,要考慮的因素很多,,本文主要從業(yè)務場景來分析需要考慮的因素,同時技術上也需要考慮運維復雜度,、業(yè)務規(guī)模,、社區(qū)活躍度、學習成本等因素,。希望本文對你使用消息隊列有所幫助,。 |
|