久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

RocketMQ第四章:手把手教老婆實(shí)現(xiàn)-順序消息生產(chǎn)者和順序消息消費(fèi)者

 小虛竹 2021-11-30
技術(shù)活,該賞
關(guān)注+一鍵三連(點(diǎn)贊,評(píng)論,收藏)再看,養(yǎng)成好習(xí)慣

RocketMQ使用教程相關(guān)系列 目錄



第一節(jié):介紹

順序消息含義介紹

順序消息指的是可以按照消息的發(fā)送順序來消費(fèi)(FIFO),。RocketMQ可以嚴(yán)格的保證消息有序,可以分為分區(qū)有序或者全局有序,。

原理解析

在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);

而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。

但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序,。如下圖所示:

當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的,。

第二節(jié):順序消息-生產(chǎn)者和消息者步驟說明

順序消息生產(chǎn)者代碼實(shí)現(xiàn)步驟

1.創(chuàng)建消息生產(chǎn)者producer并制定生產(chǎn)者組名

2.指定Nameserver地址

3.啟動(dòng)producer

4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體

5.發(fā)送消息,選擇的send方法有三個(gè)參數(shù):

  • * 參數(shù)一:消息對(duì)象
  • * 參數(shù)二:消息隊(duì)列的選擇器
  • * 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí) 6.關(guān)閉生產(chǎn)者producer

順序消息消費(fèi)者代碼實(shí)現(xiàn)步驟

1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名

2.指定Nameserver地址

3.訂閱主題Topic和Tag

4.設(shè)置回調(diào)函數(shù),處理消息:與普通消息的差別,這里用的是MessageListenerOrderly

5.啟動(dòng)消費(fèi)者consumer

注意:消費(fèi)者的 Topic 和 Tag 需要和生產(chǎn)者保持一致

第三節(jié):順序消息生產(chǎn)者

public class Producer {

   public static void main(String[] args) throws Exception {
      // 1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.啟動(dòng)producer
      producer.start();
      System.out.println("生產(chǎn)者啟動(dòng)");
      for (int i = 0; i < 20; i++) {
         // 4.創(chuàng)建消息對(duì)象,指定主題Topic,、Tag和消息體

         Message msg = new Message("Topic_order_demo", "Tag_order_demo",
               ("Hello 虛竹,這是順序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         // 5.發(fā)送消息
         /**
         * 參數(shù)一:消息對(duì)象
         * 參數(shù)二:消息隊(duì)列的選擇器
         * 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí)
         */
         SendResult result = producer.send(msg, new MessageQueueSelector() {
            /**
            *
            * @param mqs:隊(duì)列集合
            * @param msg:消息對(duì)象
            * @param arg:業(yè)務(wù)標(biāo)識(shí)的參數(shù)
            * @return
            */
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
               Integer index = (Integer) arg;
               return mqs.get(index);
            }
         }, 1);
         System.out.println("發(fā)送結(jié)果:" + msg.toString());
      }

      // 6.關(guān)閉生產(chǎn)者producer
      producer.shutdown();
      System.out.println("生產(chǎn)者關(guān)閉");
   }
}

效果:

0

第四節(jié):順序消息消費(fèi)者

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        //1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.88.131:9876");
        
        //消息拉取最大條數(shù)
        consumer.setConsumeMessageBatchMaxSize(2);
        //3.訂閱主題Topic和Tag
        consumer.subscribe("Topic_order_demo", "*");

        //4.設(shè)置回調(diào)函數(shù),處理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //接受消息內(nèi)容
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {

                    try {
                        //獲取主題
                        String topic = msg.getTopic();
                        //獲取標(biāo)簽
                        String tags = msg.getTags();
                        //獲取信息
                        byte[] body =  msg.getBody();
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消費(fèi)信息:topic:" + topic + ",tags:"+tags+
                                ",result"+ result);

                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //重試
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.啟動(dòng)消費(fèi)者consumer
        consumer.start();
    }
}

效果:

0

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多