技術(shù)活,該賞
關(guān)注+一鍵三連(點(diǎn)贊,評(píng)論,收藏)再看,養(yǎng)成好習(xí)慣
RocketMQ使用教程相關(guān)系列 目錄
第一節(jié):介紹
順序消息含義介紹
順序消息指的是可以按照消息的發(fā)送順序來消費(fèi)(FIFO),。RocketMQ可以嚴(yán)格的保證消息有序,可以分為分區(qū)有序或者全局有序,。
原理解析
在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);
而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。
但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序,。如下圖所示:
當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的,。
第二節(jié):順序消息-生產(chǎn)者和消息者步驟說明
順序消息生產(chǎn)者代碼實(shí)現(xiàn)步驟
1.創(chuàng)建消息生產(chǎn)者producer并制定生產(chǎn)者組名
2.指定Nameserver地址
3.啟動(dòng)producer
4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
5.發(fā)送消息,選擇的send方法有三個(gè)參數(shù):
- * 參數(shù)一:消息對(duì)象
- * 參數(shù)二:消息隊(duì)列的選擇器
- * 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí) 6.關(guān)閉生產(chǎn)者producer
順序消息消費(fèi)者代碼實(shí)現(xiàn)步驟
1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名
2.指定Nameserver地址
3.訂閱主題Topic和Tag
4.設(shè)置回調(diào)函數(shù),處理消息:與普通消息的差別,這里用的是MessageListenerOrderly
5.啟動(dòng)消費(fèi)者consumer
注意:消費(fèi)者的 Topic 和 Tag 需要和生產(chǎn)者保持一致
第三節(jié):順序消息生產(chǎn)者
public class Producer {
public static void main(String[] args) throws Exception {
// 1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
// 2.指定Nameserver地址
producer.setNamesrvAddr("192.168.88.131:9876");
// 3.啟動(dòng)producer
producer.start();
System.out.println("生產(chǎn)者啟動(dòng)");
for (int i = 0; i < 20; i++) {
// 4.創(chuàng)建消息對(duì)象,指定主題Topic,、Tag和消息體
Message msg = new Message("Topic_order_demo", "Tag_order_demo",
("Hello 虛竹,這是順序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.發(fā)送消息
/**
* 參數(shù)一:消息對(duì)象
* 參數(shù)二:消息隊(duì)列的選擇器
* 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí)
*/
SendResult result = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param mqs:隊(duì)列集合
* @param msg:消息對(duì)象
* @param arg:業(yè)務(wù)標(biāo)識(shí)的參數(shù)
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer index = (Integer) arg;
return mqs.get(index);
}
}, 1);
System.out.println("發(fā)送結(jié)果:" + msg.toString());
}
// 6.關(guān)閉生產(chǎn)者producer
producer.shutdown();
System.out.println("生產(chǎn)者關(guān)閉");
}
}
效果:
第四節(jié):順序消息消費(fèi)者
public class Consumer {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建消費(fèi)者Consumer,制定消費(fèi)者組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.88.131:9876");
//消息拉取最大條數(shù)
consumer.setConsumeMessageBatchMaxSize(2);
//3.訂閱主題Topic和Tag
consumer.subscribe("Topic_order_demo", "*");
//4.設(shè)置回調(diào)函數(shù),處理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
//接受消息內(nèi)容
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs) {
try {
//獲取主題
String topic = msg.getTopic();
//獲取標(biāo)簽
String tags = msg.getTags();
//獲取信息
byte[] body = msg.getBody();
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消費(fèi)信息:topic:" + topic + ",tags:"+tags+
",result"+ result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
//重試
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.啟動(dòng)消費(fèi)者consumer
consumer.start();
}
}
效果: