久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

消息隊列技術選型:這 7 種消息場景一定要考慮,!

 黃爸爸好 2023-09-27 發(fā)布于浙江

大家好,我是君哥,。

我們在做消息隊列的技術選型時,,往往會結合業(yè)務場景進行考慮。今天來聊一聊消息隊列可能會用到的 7 種消息場景,。

1 普通消息

消息隊列最基礎的功能就是生產者發(fā)送消息,、Broker 保存消息,消費者來消費消息,,以此實現(xiàn)系統(tǒng)解耦,、削峰填谷的作用。

圖片

普通消息是消息隊列必備的消息類型,,也是系統(tǒng)使用場景最多的一種消息,。

2 順序消息

順序消息是指生產者發(fā)送消息的順序和消費者消費消息的順序是一致的,。比如在一個電商場景,同一個用戶提交訂單,、訂單支付,、訂單出庫,這三個消息消費者需要按照順序來進行消費,。如下圖:

圖片

順序消息的實現(xiàn)并不容易,,原因如下:

  • 生產者集群中,有多個生產者發(fā)送消息,,網絡延遲不一樣,,很難保證發(fā)送到 Broker 的消息落盤順序是一致的;
  • 如果 Broker 有多個分區(qū)或隊列,,生產者發(fā)送的消息會進入多個分區(qū),,也無法保證順序消費;
  • 如果有多個消費者來異步消費同一個分區(qū),,很難保證消費順序跟生產者發(fā)送順序一致,。

要保證消息有序,需要滿足兩個條件:

  • 同一個生產者必須同步發(fā)送消息到同一個分區(qū),;
  • 一個分區(qū)只能給同一個消費者消費,。

如下圖:

圖片

上面第二個條件是比較容易實現(xiàn)的,一個分區(qū)綁定一個消費者就可以,,主要是第一個條件,。

在主流消息隊列的實現(xiàn)中,Kafka 和 Pulsar 的實現(xiàn)方式類似,,生產者給消息賦值一個 key,,對 key 做 Hash 運算來指定消息發(fā)送到哪一個分區(qū)。比如上面電商的例子,,對同一個用戶的一筆訂單,,提交訂單,、訂單支付,、訂單出庫這三個消息賦值同一個 key,就可以把這三條消息發(fā)送到同一個分區(qū),。

對于 RocketMQ,,生產者在發(fā)送消息的時候,可以通過 MessageQueueSelector 指定把消息投遞到那個 MessageQueue,,如下圖:

圖片

示例代碼如下:

public static void main(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer('please_rename_unique_group_name');
  producer.start();

  String[] tags = new String[] {'TagA''TagB''TagC''TagD''TagE'};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message('TopicTestjjj', tags[i % tags.length], 'KEY' + i,
     ('Hello RocketMQ ' + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf('%s%n', sendResult);
  }

  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

RabbitMQ 的實現(xiàn)是 Exchange 根據設置好的 Route Key 將數(shù)據路由到不同的 Queue 中,。示例代碼如下:

@Resource
private AmqpTemplate rabbitTemplate;

public void send1(String message) {
 rabbitTemplate.convertAndSend('testExchange''testRoutingKey', message);
}

3 延時消息

或者也叫定時消息,是指消息發(fā)送后不會立即被消費,,而是指定一個時間,,到時間后再消費,。經典的場景比如電商購物時,30 分鐘未支付訂單,,讓訂單自動失效,。

3.1 RocketMQ 實現(xiàn)

RocketMQ 定義了 18 個延時級別,每個延時級別對應一個延時時間,。下面如果延遲級別是 3,,則消息會延遲 10s 才會拉取。

//MessageStoreConfig類
private String messageDelayLevel = '1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h';

RocketMQ 的延時消息如下圖:

圖片

生產者把消費發(fā)送到 Broker 后,,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,,然后調度任務會判斷是否到期,如果到期,,會把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,,這樣消費者就可以消費到了。

RocketMQ 的延時消息只支持最大兩個小時的延時,,不過 RocketMQ5.0 基于時間輪算法實現(xiàn)了定時消息,,解決了這個問題。

3.2 Pulsar 實現(xiàn)

Pulsar 的實現(xiàn)如下圖:

圖片

Pulsar 的延時消息首先會寫入一個 Delayed Message Tracker 的數(shù)據結構中,,Delayed Message Tracker 根據延時時間構建 delayed index 優(yōu)先級隊列,。消費者拉取消息時,首先去 Delayed Message Tracker 檢查是否有到期的消息,。如果有則直接拉取進行消費,。

3.3 RabbitMQ 實現(xiàn)

RabbitMQ 的實現(xiàn)方式有兩種,一種是投遞到普通隊列都不消費,,等消息過期后被投遞到死信隊列,,消費者消費死信隊列。如下圖:

圖片

第二種方式是生產者發(fā)送消息時,,先發(fā)送到本地 Mnesia 數(shù)據庫,,消息到期后定時器再將消息投遞到 broker。

3.4 Kafka 實現(xiàn)

Kafka 本身并沒有延時隊列,,不過可以通過生產者攔截器來實現(xiàn)消息延時發(fā)送,,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實現(xiàn)延時消息,。

4 事務消息

事務消息是指生產消息和消費消息滿足事務的特性,。

RabbitMQ 和 Kafka 的事務消息都是只支持生產消息的事務特性,即一批消息要不全部發(fā)送成功,,要不全部發(fā)送失敗,。

RabbitMQ 通過 Channel 來開啟事務消息,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//開啟事務
channel.txSelect();
channel.basicPublish('directTransactionExchange','transactionRoutingKey',null,message.getBytes('utf-8'));
//提交事務 或者 channel.txRollback()回滾事務
channel.txCommit();

Kafka 可以給多個生產者設置同一個事務 ID ,從而把多個 Topic ,、多個 Partition 放在一個事務中,,實現(xiàn)原子性寫入。

Pulsar 的事務消息對于事務語義的定義是:允許事件流應用將消費,、處理,、生產消息整個過程定義為一個原子操作??梢?,Pulsar 的事務消息可以覆蓋消息流整個過程。

RocketMQ 的事務消息是通過 half 消息來實現(xiàn)的,。以電商購物場景來看,,賬戶服務扣減賬戶金額后,發(fā)送消息給 Broker,,庫存服務來消費這條消息進行扣減庫存,。如下圖:

圖片

可見,RocketMQ 只能保證生產者發(fā)送消息和本地事務的原子性,,并不能保證消費消息的原子性,。

5 軌跡消息

軌跡消息主要用于跟蹤消息的生命周期,當消息丟失時可以很方便地找出原因,。

軌跡消息也跟普通消息一樣,,也需要存儲和查詢,也會占用消息隊列的資源,,所以選擇軌跡消息要考慮下面幾點:

  • 消息生命周期的關鍵節(jié)點一定要記錄,;
  • 不能影響正常消息的發(fā)送和消費性能;
  • 不能影響 Broker 的消息存儲性能,;
  • 要考慮消息查詢維度和性能,。

RabbitMQ Broker 實現(xiàn)了軌跡消息的功能,打開 Trace 開關,,就可以把軌跡消息發(fā)送到 amq.rabbitmq.trace 這個 exchange,,但是要考慮軌跡消息會不會給 Broker 造成 壓力進而導致消息積壓。RabbitMQ 的生產者和消費者都沒有實現(xiàn)軌跡消息,,需要開發(fā)者自己來實現(xiàn),。

RocketMQ 生產者、Broker 和消費者都實現(xiàn)了軌跡消息,,不過默認是關閉的,,需要手工開啟,。

使用軌跡消息,,需要考慮記錄哪些節(jié)點、存儲介質、性能,、查詢方式等問題,。

6 死信隊列

在消息隊列中,死信隊列主要應對一些異常的情況,,如下圖:

圖片

RocketMQ 實現(xiàn)了消費端的死信隊列,,當消費者消費失敗時,會進行重試,,如果重試 16 次還是失敗,,則這條消息會被發(fā)送到死信隊列。

RabbitMQ 實現(xiàn)了生產者和 Broker 的死信隊列,,下面三種情況,,消息會被發(fā)送到死信隊列:

  • 生產者發(fā)送消息被拒絕,并且 requeue 參數(shù)設置為 false,;
  • Broker 消息過期了,;
  • 隊列達到最大長度。

RabbitMQ 消息變成死信消息后,,會被發(fā)送到死信交換機(Dead-Letter-Exchange),。

7 優(yōu)先級消息

有一些業(yè)務場景下,我們需要優(yōu)先處理一些消息,,比如銀行里面的金卡客戶,、銀卡客戶優(yōu)先級高于普通客戶,他們的業(yè)務需要優(yōu)先處理,。如下圖:

圖片

主流消息隊列中,,RabbitMQ 是支持優(yōu)先級隊列的,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
//設置優(yōu)先級為 5
args.put('x-max-priority'5);
channel.queueDeclare('my-priority-queue'truefalsefalse, args);

8 總結

消息隊列技術選型,,要考慮的因素很多,,本文主要從業(yè)務場景來分析需要考慮的因素,同時技術上也需要考慮運維復雜度,、業(yè)務規(guī)模,、社區(qū)活躍度、學習成本等因素,。希望本文對你使用消息隊列有所幫助,。

    本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發(fā)布,,不代表本站觀點,。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,,謹防詐騙,。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報,。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多