久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

EQueue

 ThinkTank_引擎 2014-12-24

前言


本文想介紹一下前段時(shí)間在寫(xiě)enode時(shí),,順便實(shí)現(xiàn)的一個(gè)分布式消息隊(duì)列equeue。這個(gè)消息隊(duì)列的思想不是我想出來(lái)的,,而是通過(guò)學(xué)習(xí)阿里的rocketmq后,,自己用c#實(shí)現(xiàn)了一個(gè)輕量級(jí)的簡(jiǎn)單版本。一方面可以通過(guò)寫(xiě)這個(gè)隊(duì)列讓自己更深入的掌握消息隊(duì)列的一些常見(jiàn)問(wèn)題,;另一方面也可以用來(lái)和enode集成,,為enode中的command和domain event的消息傳遞提供支持。目前在.net平臺(tái),,比較好用的消息隊(duì)列,,最常見(jiàn)的是微軟的MSMQ了吧,還有像rabbitmq也有.net的client端,。這些消息隊(duì)列都很強(qiáng)大和成熟,。但當(dāng)我學(xué)習(xí)了kafka以及阿里的rocketmq(早期版本叫metaq,自metaq 3.0后改名為rocketmq)后,,覺(jué)得rocketmq的設(shè)計(jì)思想深深吸引了我,,因?yàn)槲也粌H能理解其思想,還有其完整的源代碼可以學(xué)習(xí),。但是rocketmq是java寫(xiě)的,,且目前還沒(méi)有.net的client端,,所以不能直接使用(有興趣的朋友可以為其寫(xiě)一個(gè).net的client端),所以在學(xué)習(xí)了rocketmq的設(shè)計(jì)文檔以及大部分代碼后,,決定自己用c#寫(xiě)一個(gè)出來(lái),。


項(xiàng)目開(kāi)源地址:https://github.com/tangxuehua/equeue,項(xiàng)目中包含了隊(duì)列的全部源代碼以及如何使用的示例,。也可以在enode項(xiàng)目中看到如何使用,。


EQueue消息隊(duì)列中的專(zhuān)業(yè)術(shù)語(yǔ)


Topic


一個(gè)topic就是一個(gè)主題。一個(gè)系統(tǒng)中,,我們可以對(duì)消息劃分為一些topic,,這樣我們就能通過(guò)topic,將消息發(fā)送到不同的queue,。


Queue


一個(gè)topic下,,我們可以設(shè)置多個(gè)queue,,每個(gè)queue就是我們平時(shí)所說(shuō)的消息隊(duì)列,;因?yàn)閝ueue是完全從屬于某個(gè)特定的topic的,所以當(dāng)我們要發(fā)送消息時(shí),,總是要指定該消息所屬的topic是什么,。然后equeue就能知道該topic下有幾個(gè)queue了。但是到底發(fā)送到哪個(gè)queue呢,?比如一個(gè)topic下有4個(gè)queue,,那對(duì)于這個(gè)topic下的消息,發(fā)送時(shí),,到底該發(fā)送到哪個(gè)queue呢,?那必定有個(gè)消息被路由的過(guò)程。目前equeue的做法是在發(fā)送一個(gè)消息時(shí),,需要用戶(hù)指定這個(gè)消息對(duì)應(yīng)的topic以及一個(gè)用來(lái)路由的一個(gè)object類(lèi)型的參數(shù),。equeue會(huì)根據(jù)topic得到所有的queue,然后根據(jù)該object參數(shù)通過(guò)hash code然后取模queue的個(gè)數(shù)最后得到要發(fā)送的queue的編號(hào),,從而知道該發(fā)送到哪個(gè)queue,。這個(gè)路由消息的過(guò)程是在發(fā)送消息的這一方做的,也就是下面要說(shuō)的producer,。之所以不在消息服務(wù)器上做是因?yàn)檫@樣可以讓用戶(hù)自己決定該如何路由消息,,具有更大的靈活性。


Producer


就是消息隊(duì)列的生產(chǎn)者,。我們知道,,消息隊(duì)列的本質(zhì)就是實(shí)現(xiàn)了publish-subscribe的模式,即生產(chǎn)者-消費(fèi)者模式,。生產(chǎn)者生產(chǎn)消息,,消費(fèi)者消費(fèi)消息,。所以這里的Producer就是用來(lái)生產(chǎn)和發(fā)送消息的。


Consumer


就是消息隊(duì)列的消費(fèi)者,,一個(gè)消息可以有多個(gè)消費(fèi)者,。


Consumer Group


消費(fèi)者分組,這可能對(duì)大家來(lái)說(shuō)是一個(gè)新概念,。之所以要搞出一個(gè)消費(fèi)者分組,,是為了實(shí)現(xiàn)下面要說(shuō)的集群消費(fèi)。一個(gè)消費(fèi)者分組中包含了一些消費(fèi)者,,如果這些消費(fèi)者是要集群消費(fèi),,那這些消費(fèi)者會(huì)平均消費(fèi)該分組中的消息。


Broker


equeue中的broker負(fù)責(zé)消息的中轉(zhuǎn),,即接收producer發(fā)送過(guò)來(lái)的消息,,然后持久化消息到磁盤(pán),然后接收consumer發(fā)送過(guò)來(lái)的拉取消息的請(qǐng)求,,然后根據(jù)請(qǐng)求拉取相應(yīng)的消息給consumer,。所以,broker可以理解為消息隊(duì)列服務(wù)器,,提供消息的接收,、存儲(chǔ)、拉取服務(wù),??梢?jiàn),broker對(duì)于equeue來(lái)說(shuō)是核心,,它絕對(duì)不能掛,,一旦掛了,那producer,,consumer就無(wú)法實(shí)現(xiàn)publish-subscribe了,。


集群消費(fèi)


集群消費(fèi)是指,一個(gè)consumer group下的consumer,,平均消費(fèi)topic下的queue,。具體如何平均可以看一下下面的架構(gòu)圖,這里先用文字簡(jiǎn)單描述一下,。假如一個(gè)topic下有4個(gè)queue,,然后當(dāng)前有一個(gè)consumer group,該分組下有4個(gè)consumer,,那每個(gè)consumer就被分配到該topic下的一個(gè)queue,,這樣就達(dá)到了平均消費(fèi)topic下的queue的目的。如果consumer group下只有兩個(gè)consumer,,那每個(gè)consumer就消費(fèi)2個(gè)queue,。如果有3個(gè)consumer,,則第一個(gè)消費(fèi)2個(gè)queue,后面兩個(gè)每個(gè)消費(fèi)一個(gè)queue,,從而達(dá)到盡量平均消費(fèi),。所以,可以看出,,我們應(yīng)該盡量讓consumer group下的consumer的數(shù)目和topic的queue的數(shù)目一致或成倍數(shù)關(guān)系,。這樣每個(gè)consumer消費(fèi)的queue的數(shù)量總是一樣的,這樣每個(gè)consumer服務(wù)器的壓力才會(huì)差不多,。當(dāng)前前提是這個(gè)topic下的每個(gè)queue里的消息的數(shù)量總是差不多多的,。這點(diǎn)我們可以對(duì)消息根據(jù)某個(gè)用戶(hù)自己定義的key來(lái)進(jìn)行hash路由來(lái)保證。


廣播消費(fèi)


廣播消費(fèi)是指一個(gè)consumer只要訂閱了某個(gè)topic的消息,,那它就會(huì)收到該topic下的所有queue里的消息,,而不管這個(gè)consumer的group是什么。所以對(duì)于廣播消費(fèi)來(lái)說(shuō),,consumer group沒(méi)什么實(shí)際意義,。consumer可以在實(shí)例化時(shí),我們可以指定是集群消費(fèi)還是廣播消費(fèi),。


消費(fèi)進(jìn)度(offset)


消費(fèi)進(jìn)度是指,,當(dāng)一個(gè)consumer group里的consumer在消費(fèi)某個(gè)queue里的消息時(shí),,equeue是通過(guò)記錄消費(fèi)位置(offset)來(lái)知道當(dāng)前消費(fèi)到哪里了,。以便該consumer重啟后繼續(xù)從該位置開(kāi)始消費(fèi)。比如一個(gè)topic有4個(gè)queue,,一個(gè)consumer group有4個(gè)consumer,,則每個(gè)consumer分配到一個(gè)queue,然后每個(gè)consumer分別消費(fèi)自己的queue里的消息,。equeue會(huì)分別記錄每個(gè)consumer對(duì)其queue的消費(fèi)進(jìn)度,,從而保證每個(gè)consumer重啟后知道下次從哪里開(kāi)始繼續(xù)消費(fèi)。實(shí)際上,,也許下次重啟后不是由該consumer消費(fèi)該queue了,,而是由group里的其他consumer消費(fèi)了,這樣也沒(méi)關(guān)系,,因?yàn)槲覀円呀?jīng)記錄了這個(gè)queue的消費(fèi)位置了,。所以可以看出,消費(fèi)位置和consumer其實(shí)無(wú)關(guān),,消費(fèi)位置完全是queue的一個(gè)屬性,,用來(lái)記錄當(dāng)前被消費(fèi)到哪里了。另外一點(diǎn)很重要的是,,一個(gè)topic可以被多個(gè)consumer group里的consumer訂閱,。不同consumer group里的consumer即便是消費(fèi)同一個(gè)topic下的同一個(gè)queue,,那消費(fèi)進(jìn)度也是分開(kāi)存儲(chǔ)的。也就是說(shuō),,不同的consumer group內(nèi)的consumer的消費(fèi)完全隔離,,彼此不受影響。還有一點(diǎn)就是,,對(duì)于集群消費(fèi)和廣播消費(fèi),,消費(fèi)進(jìn)度持久化的地方是不同的,集群消費(fèi)的消費(fèi)進(jìn)度是放在broker,,也就是消息隊(duì)列服務(wù)器上的,,而廣播消費(fèi)的消費(fèi)進(jìn)度是存儲(chǔ)在consumer本地磁盤(pán)上的。之所以這樣設(shè)計(jì)是因?yàn)?,?duì)于集群消費(fèi),,由于一個(gè)queue的消費(fèi)者可能會(huì)更換,因?yàn)閏onsumer group下的consumer數(shù)量可能會(huì)增加或減少,,然后就會(huì)重新計(jì)算每個(gè)consumer該消費(fèi)的queue是哪些,,這個(gè)能理解的把?所以,,當(dāng)出現(xiàn)一個(gè)queue的consumer變動(dòng)的時(shí)候,,新的consumer如何知道該從哪里開(kāi)始消費(fèi)這個(gè)queue呢?如果這個(gè)queue的消費(fèi)進(jìn)度是存儲(chǔ)在前一個(gè)consumer服務(wù)器上的,,那就很難拿到這個(gè)消費(fèi)進(jìn)度了,,因?yàn)橛锌赡苣莻€(gè)服務(wù)器已經(jīng)掛了,或者下架了,,都有可能,。而因?yàn)閎roker對(duì)于所有的consumer總是在服務(wù)的,所以,,在集群消費(fèi)的情況下,,被訂閱的topic的queue的消費(fèi)位置是存儲(chǔ)在broker上的,存儲(chǔ)的時(shí)候按照不同的consumer group做隔離,,以確保不同的consumer group下的consumer的消費(fèi)進(jìn)度互補(bǔ)影響,。然后,對(duì)于廣播消費(fèi),,由于不會(huì)出現(xiàn)一個(gè)queue的consumer會(huì)變動(dòng)的情況,,所以我們沒(méi)必要讓broker來(lái)保存消費(fèi)位置,所以是保存在consumer自己的服務(wù)器上,。


EQueue是什么,?



通過(guò)上圖,我們能直觀的理解equeue。這個(gè)圖是從rocketmq的設(shè)計(jì)文檔中拿來(lái)的,,呵呵,。由于equeue的設(shè)計(jì)思想完全和rocketmq一致,所以我就拿過(guò)來(lái)用了,。每個(gè)producer可以向某個(gè)topic發(fā)消息,,發(fā)送的時(shí)候根據(jù)某種路由策略(producer可自定義)將消息發(fā)送到某個(gè)特定的queue。然后consumer可以消費(fèi)特定topic下的queue里的消息,。上圖中,,TOPIC_A有兩個(gè)消費(fèi)者,這兩個(gè)消費(fèi)者是在一個(gè)group里,,所以應(yīng)該平均消費(fèi)TOPIC_A下的queue但由于有三個(gè)queue,,所以第一個(gè)consumer分到了2個(gè)queue,第二個(gè)consumer分到了1個(gè),。對(duì)于TOPIC_B,,由于只有一個(gè)消費(fèi)者,那TOPIC_B下的所有queue都由它消費(fèi),。所有的topic信息,、queue信息、還有消息本身,,都存儲(chǔ)在broker服務(wù)器上,。這點(diǎn)上圖中沒(méi)有體現(xiàn)出來(lái)。上圖主要關(guān)注producer,consumer,topic,queue這四個(gè)東西之間的關(guān)系,,并不關(guān)注物理服務(wù)器的部署架構(gòu),。


關(guān)鍵問(wèn)題的思考


1.Producer,Broker,Consumer三者之間如何通信


由于是用c#實(shí)現(xiàn),且因?yàn)橐话闶窃诰钟蚓W(wǎng)內(nèi)部署,,為了實(shí)現(xiàn)高性能通信,,我們可以利用異步socket來(lái)通信,。.net本身提供了很好的異步socket通信的支持,;我們也可以用zeromq來(lái)實(shí)現(xiàn)高性能的socket通信。本來(lái)想直接使用zeromq來(lái)實(shí)現(xiàn)通信模塊就好了,,但后來(lái)自己學(xué)習(xí)了一下.net自帶的socket通信相關(guān)知識(shí),,發(fā)現(xiàn)也不難,所以就自己實(shí)現(xiàn)了一個(gè),,呵呵,。自己實(shí)現(xiàn)的好處是我可以自己定義消息的協(xié)議,目前這部分實(shí)現(xiàn)代碼在ecommon基礎(chǔ)類(lèi)庫(kù)中,,是一個(gè)獨(dú)立的可服用的與業(yè)務(wù)場(chǎng)景無(wú)關(guān)的基礎(chǔ)類(lèi)庫(kù),。有興趣的可以去下載下來(lái)看看代碼。經(jīng)過(guò)了自己的一些性能測(cè)試,發(fā)現(xiàn)通信模塊的性能還是不錯(cuò)的,。一臺(tái)broker,,四臺(tái)producer同時(shí)向這個(gè)broker發(fā)送消息,每秒能發(fā)送的消息4W沒(méi)有問(wèn)題,,更多的producer還沒(méi)測(cè)試,。


2.消息如何持久化


消息持久化方面主要考慮的是性能問(wèn)題,還有就是消息如何快速的讀取,。


1. 首先,,一臺(tái)broker上的消息不需要一直保存在該broker服務(wù)器上,因?yàn)檫@些消息總會(huì)被消費(fèi)掉,。根據(jù)阿里rocketmq的設(shè)計(jì),,默認(rèn)會(huì)1天刪除一次已經(jīng)被消費(fèi)過(guò)的消息。所以,,我們可以理解,,broker上的消息應(yīng)該不會(huì)無(wú)限制增長(zhǎng),因?yàn)闀?huì)被定期刪除,。所以不必考慮一臺(tái)broker上消息放不下的問(wèn)題,。


2. 如何快速的持久化消息?一般來(lái)說(shuō),,我覺(jué)得有兩種方式:1)順序?qū)懘疟P(pán)文件,;2)用現(xiàn)成的key,value的nosql產(chǎn)品來(lái)存儲(chǔ);rocketmq目前用的是自己寫(xiě)文件的方式,,這種方式的難點(diǎn)是寫(xiě)文件比較復(fù)雜,,因?yàn)樗邢⒍际琼樞騛ppend到文件末尾,雖然性能非常高,,但復(fù)雜度也很高,;比如所有消息不能全寫(xiě)在一個(gè)文件里,一個(gè)文件到達(dá)一定大小后需要拆分,,一旦拆分就會(huì)產(chǎn)生很多問(wèn)題,,呵呵。拆分后如何讀取也是比較復(fù)雜的問(wèn)題,。還有由于是順序?qū)懭胛募?,那我們還需要把每一個(gè)消息在文件中的起始位置和長(zhǎng)度需要記錄下來(lái),這樣consumer在消費(fèi)消息時(shí),,才能根據(jù)offset從文件中拿到該消息,。總之需要考慮的問(wèn)題很多,。如果是用nosql來(lái)持久化消息,,那可以省去我們寫(xiě)文件時(shí)遇到的各種問(wèn)題,,我們只需要關(guān)心如何把消息的key和該消息在queue中的offset對(duì)應(yīng)起來(lái)即可。另外一點(diǎn)疑問(wèn)是,,queue里的信息要持久化嗎,?先要想清楚queue里放的是什么東西。當(dāng)broker接收到一個(gè)消息后,,首先肯定是要先持久化,,完成后需要把消息放入queue里。但由于內(nèi)存很有限,,我們不可能把這個(gè)消息直接放入queue里,,我們其實(shí)要放的只需要時(shí)該消息在nosql里的key即可,或者如果是用文件來(lái)持久化,,那放的是該消息在文件中的偏移量offset,,即存儲(chǔ)在文件的那個(gè)位置(比如是哪個(gè)行號(hào))。所以,,實(shí)際上,,queue只是一個(gè)消息的索引。那有必要持久化queue嗎,?可以持久化,,這樣畢竟在broker重啟的時(shí)候,恢復(fù)queue的時(shí)間可以縮短,。那需要和持久化消息同步持久化嗎,?顯然不需要,我們可以異步定時(shí)持久化每個(gè)queue,,然后恢復(fù)queue的時(shí)候,,可以先從持久化的部分恢復(fù),然后再把剩下的部分通過(guò)持久化的消息來(lái)補(bǔ)充以達(dá)到queue因?yàn)楫惒匠志没牟糠挚梢宰菲?。所以,,?jīng)過(guò)上面的分析,消息本身都是放在nosql中,,queue全部在內(nèi)存中,。


那消息如何持久化呢?我覺(jué)得最好的辦法是讓每個(gè)消息有一個(gè)全局的順序號(hào),,一旦消息被寫(xiě)入nosql后,,該消息的全局順序號(hào)就確定了,然后我們?cè)诟聦?duì)應(yīng)的queue的信息時(shí),,把該消息的全局順序號(hào)傳給queue,這樣queue就能把queue自己對(duì)該消息的本地順序號(hào)和該消息的全局順序號(hào)建立映射關(guān)系,。相關(guān)代碼如下:


復(fù)制代碼

public MessageStoreResult StoreMessage(Message message, int queueId)
{
    var queues = GetQueues(message.Topic);
    var queueCount = queues.Count;
    if (queueId >= queueCount || queueId < 0)
    {
        throw new InvalidQueueIdException(message.Topic, queueCount, queueId);
    }
    var queue = queues[queueId];
    var queueOffset = queue.IncrementCurrentOffset();
    var storeResult = _messageStore.StoreMessage(message, queue.QueueId, queueOffset);
    queue.SetMessageOffset(queueOffset, storeResult.MessageOffset);
    return storeResult;
}

復(fù)制代碼

沒(méi)什么比代碼更能說(shuō)明問(wèn)題了,,呵呵。上的代碼的思路是,接收一個(gè)消息對(duì)象和一個(gè)queueId,,queueId表示當(dāng)前消息要放到第幾個(gè)queue里,。然后內(nèi)部邏輯是,先獲取該消息的topic的所有queue,,由于queue和topic都在內(nèi)存,,所以這里沒(méi)性能問(wèn)題。然后檢查一下當(dāng)前傳遞進(jìn)來(lái)的queueId是否合法,。如果合法,,那就定位到該queue,然后通過(guò)IncrementCurrentOffset方法,,將queue的內(nèi)部序號(hào)加1并返回,,然后持久化消息,持久化的時(shí)候把queueId以及queueOffset一起持久化,,完成后返回一個(gè)消息的全局序列號(hào),。由于messageStore內(nèi)部會(huì)把消息內(nèi)容、queueId,、queueOffset,,以及消息的全局順序號(hào)一起作為一個(gè)整體保存到nosql中,key就是消息的全局序列號(hào),,value就是前面說(shuō)的整體(被序列化為二進(jìn)制),。然后,在調(diào)用queue的SetMessageOffset方法,,把queueOffset和message的全局offset建立映射關(guān)系即可,。最后返回一個(gè)結(jié)果。messageStore.StoreMessage的內(nèi)存實(shí)現(xiàn)大致如下:


復(fù)制代碼

public MessageStoreResult StoreMessage(Message message, int queueId, long queueOffset)
{
    var offset = GetNextOffset();
    _queueCurrentOffsetDict[offset] = new QueueMessage(message.Topic, message.Body, offset, queueId, queueOffset, DateTime.Now);
    return new MessageStoreResult(offset, queueId, queueOffset);
}

復(fù)制代碼

GetNextOffset就是獲取下一個(gè)全局的消息序列號(hào),,QueueMessage就是上面所說(shuō)的“整體”,,因?yàn)槭莾?nèi)存實(shí)現(xiàn),所以就用了一個(gè)ConcurrentDictionary來(lái)保存一下queueMessage對(duì)象,。如果是用nosql來(lái)實(shí)現(xiàn)messageStore,,則這里需要寫(xiě)入nosql,key就是消息的全局序列號(hào),,value就是queueMessage的二進(jìn)制序列化數(shù)據(jù),。通過(guò)上面的分析我們可以知道我們會(huì)將消息的全局序列號(hào)+queueId+queueOffset一起整體作為一條記錄持久化起來(lái)。這樣做有兩個(gè)非常好的特性:1)實(shí)現(xiàn)了消息持久化和消息在queue中的位置的持久化的原子事務(wù),;2)我們總是可以根據(jù)這些持久化的queueMessage還原出所有的queue的信息,,因?yàn)閝ueueMessage里包含了消息和消息在queue的中的位置信息;


基于這樣的消息存儲(chǔ),,當(dāng)某個(gè)consumer要消費(fèi)某個(gè)位置的消息時(shí),,我們可以通過(guò)先通過(guò)queueId找到queue,,然后通過(guò)消息在queueOffset(由consumer傳遞過(guò)來(lái)的)獲取消息的全局offset,然后根據(jù)該全局的offset作為key從nosql拿到消息,。實(shí)際上現(xiàn)在的equeue是批量拉取消息的,,也就是一次socket請(qǐng)求不是拉一個(gè)消息,而是拉一批,,默認(rèn)是32個(gè)消息,。這樣consumer可以用更少的網(wǎng)絡(luò)請(qǐng)求拿到更多的消息,可以加快消息消費(fèi)的速度,。


3.Producer發(fā)送消息時(shí)的消息路由的細(xì)節(jié)


producer在發(fā)送消息時(shí),,如何知道當(dāng)前topic下有多少個(gè)queue呢?每次發(fā)送消息時(shí)都要去broker上查一下嗎,?顯然不行,,這樣發(fā)送消息的性能就上不去了。那怎么辦呢,?就是異步,,呵呵。producer可以定時(shí)向broker發(fā)送請(qǐng)求,,獲取topic下的queue數(shù)量,,然后保存起來(lái)。這樣每次producer在發(fā)送消息時(shí),,就只要從本地緩存里拿即可,。因?yàn)閎roker上topic的queue的數(shù)量一般不會(huì)變化,所以這樣的緩存很有意義,。那還有一個(gè)問(wèn)題,,當(dāng)前producer第一次對(duì)某個(gè)topic發(fā)送消息時(shí),queue哪里來(lái)呢,?因?yàn)槎〞r(shí)線程不知道要向broker拿哪個(gè)topic下的queue數(shù)量,,因?yàn)榇藭r(shí)producer端還沒(méi)有一個(gè)topic呢,因?yàn)橐粋€(gè)消息都還沒(méi)發(fā)送過(guò),。那就是需要判斷一下,,如果當(dāng)前topic沒(méi)有queue的count信息,則直接從broker上獲取queue的count信息,。然后再緩存起來(lái),,在發(fā)送當(dāng)前消息。然后第二次發(fā)送時(shí),,因?yàn)榫彺胬镆呀?jīng)有了該消息,,所以就不必再?gòu)腷roker拿了,且后續(xù)定時(shí)線程也會(huì)自動(dòng)去更新該topic下的queue的count了,。好,,producer有了topic的queue的count,,那用戶(hù)在發(fā)送消息時(shí),,框架就能把這個(gè)topic的queueCount傳遞給用戶(hù),,然后用戶(hù)就能根據(jù)自己的需要將消息路由到第幾個(gè)queue了。


4.consumer負(fù)載均衡如何實(shí)現(xiàn)


consumer負(fù)載均衡的意思是指,,在消費(fèi)者集群消費(fèi)的情況下,,如何讓同一個(gè)consumer group里的消費(fèi)者平均消費(fèi)同一個(gè)topic下的queue。所以這個(gè)負(fù)載均衡本質(zhì)上是一個(gè)將queue平均分配給consumer的過(guò)程,。那么怎么實(shí)現(xiàn)呢,?通過(guò)上面負(fù)載均衡的定義,我們只要,,要做負(fù)載均衡,,必須要確定consumer group和topic;然后拿到consumer group下的所有consumer,,以及topic下的所有queue,;然后對(duì)于當(dāng)前的consumer,就能計(jì)算出來(lái)當(dāng)前consumer應(yīng)該被分配到哪些queue了,。我們可以通過(guò)如下的函數(shù)來(lái)得到當(dāng)前的consumer應(yīng)該被分配到哪幾個(gè)queue,。


復(fù)制代碼

public class AverageAllocateMessageQueueStrategy : IAllocateMessageQueueStrategy
{
    public IEnumerable<MessageQueue> Allocate(string currentConsumerId, IList<MessageQueue> totalMessageQueues, IList<string> totalConsumerIds)
    {
        var result = new List<MessageQueue>();

        if (!totalConsumerIds.Contains(currentConsumerId))
        {
            return result;
        }

        var index = totalConsumerIds.IndexOf(currentConsumerId);
        var totalMessageQueueCount = totalMessageQueues.Count;
        var totalConsumerCount = totalConsumerIds.Count;
        var mod = totalMessageQueues.Count() % totalConsumerCount;
        var size = mod > 0 && index < mod ? totalMessageQueueCount / totalConsumerCount + 1 : totalMessageQueueCount / totalConsumerCount;
        var averageSize = totalMessageQueueCount <= totalConsumerCount ? 1 : size;
        var startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        var range = Math.Min(averageSize, totalMessageQueueCount - startIndex);

        for (var i = 0; i < range; i++)
        {
            result.Add(totalMessageQueues[(startIndex + i) % totalMessageQueueCount]);
        }

        return result;
    }
}

復(fù)制代碼

函數(shù)里的實(shí)現(xiàn)就不多分析了。這個(gè)函數(shù)的目的就是根據(jù)給定的輸入,,返回當(dāng)前consumer該分配到的queue,。分配的原則就是平均分配。好了,,有了這個(gè)函數(shù),,我們就能很方便的實(shí)現(xiàn)負(fù)載均衡了。我們可以對(duì)每一個(gè)正在運(yùn)行的consumer內(nèi)部開(kāi)一個(gè)定時(shí)job,,該job每隔一段時(shí)間進(jìn)行一次負(fù)載均衡,,也就是執(zhí)行一次上面的函數(shù),得到當(dāng)前consumer該綁定的最新queue,。因?yàn)槊總€(gè)consumer都有一個(gè)groupName屬性,,用于表示當(dāng)前consumer屬于哪個(gè)group。所以,,我們就可以在負(fù)載均衡時(shí)到broker獲取當(dāng)前group下的所有consumer,;另一方面,因?yàn)槊總€(gè)consumer都知道它自己訂閱了哪些topic,,所以有了topic信息,,就能獲取topic下的所有queue的信息了,有了這兩樣信息,,每個(gè)consumer就能自己做負(fù)載均衡了,。先看一下下面的代碼:



_scheduleService.ScheduleTask(Rebalance, Setting.RebalanceInterval, Setting.RebalanceInterval);
_scheduleService.ScheduleTask(UpdateAllTopicQueues, Setting.UpdateTopicQueueCountInterval, Setting.UpdateTopicQueueCountInterval);
_scheduleService.ScheduleTask(SendHeartbeat, Setting.HeartbeatBrokerInterval, Setting.HeartbeatBrokerInterval);


每個(gè)consumer內(nèi)部都會(huì)啟動(dòng)三個(gè)定時(shí)的task,,第一個(gè)task表示要定時(shí)做一次負(fù)載均衡;第二個(gè)task表示要定時(shí)更新當(dāng)前consumer訂閱的所有topic的queueCount信息,,并把最新的queueCount信息都保存在本地,;第三個(gè)task表示當(dāng)前consumer會(huì)向broker定時(shí)發(fā)送心跳,這樣broker就能通過(guò)心跳知道某個(gè)consumer是否還活著,,broker上維護(hù)了所有的consumer信息,。一旦有新增或者發(fā)現(xiàn)沒(méi)有及時(shí)發(fā)送心跳過(guò)來(lái)的consumer,就會(huì)認(rèn)為有新增或者死掉的consumer,。因?yàn)閎roker上維護(hù)了所有的consumer信息,,所以他就能提供查詢(xún)服務(wù),比如根據(jù)某個(gè)consumer group查詢(xún)?cè)揼roup下的consumer,。


通過(guò)這三個(gè)定時(shí)任務(wù),,就能完成消費(fèi)者的負(fù)載均衡了。先看一下Rebalance方法:


復(fù)制代碼

private void Rebalance()
{
    foreach (var subscriptionTopic in _subscriptionTopics)
    {
        try
        {
            RebalanceClustering(subscriptionTopic);
        }
        catch (Exception ex)
        {
            _logger.Error(string.Format("[{0}]: rebalanceClustering for topic [{1}] has exception", Id, subscriptionTopic), ex);
        }
    }
}

復(fù)制代碼

代碼很簡(jiǎn)單,,就是對(duì)每個(gè)訂閱的topic做負(fù)載均衡處理,。再看一下RebalanceClustering方法:


View Code

上面的代碼不多分析了,就是先根據(jù)consumer group和topic獲取所有的consumer,,然后對(duì)consumer做排序處理,。之所以要做排序處理是為了確保負(fù)載均衡時(shí)對(duì)已有的分配情況盡量不發(fā)生改變。接下來(lái)就是從本地獲取topic下的所有queue,,同樣根據(jù)queueId做一下排序,。然后就是調(diào)用上面的分配算法計(jì)算出當(dāng)前consumer應(yīng)該分配到哪些queue。最后調(diào)用UpdatePullRequestDict方法,,用來(lái)對(duì)新增或刪除的queue做處理,。對(duì)于新增的queue,要?jiǎng)?chuàng)建一個(gè)獨(dú)立的worker線程,,開(kāi)始從broker拉取消息,;對(duì)于刪除的queue,要停止其對(duì)應(yīng)的work,,停止拉取消息,。


通過(guò)上面的介紹和分析,我們大家知道了equeue是如何實(shí)現(xiàn)消費(fèi)者的負(fù)載均衡的,。我們可以看出,,因?yàn)槊總€(gè)topic下的queue的更新是異步的定時(shí)的,且負(fù)載均衡本身也是定時(shí)的,,且broker上維護(hù)的consumer的信息也不是事實(shí)的,,因?yàn)槊總€(gè)consumer發(fā)送心跳到broker不是實(shí)時(shí)發(fā)送的,而是比如每隔5s發(fā)送一次。所有這些因?yàn)槎际钱惒降脑O(shè)計(jì),,所以可能會(huì)導(dǎo)致在負(fù)載均衡的過(guò)程中,,同一個(gè)queue可能會(huì)被兩個(gè)消費(fèi)者同時(shí)消費(fèi)。這個(gè)就是所謂的,,我們只能做到一個(gè)消息至少被消費(fèi)一次,,但equeue層面做不到一個(gè)消息只會(huì)被消費(fèi)一次。實(shí)際上像rocketmq這種也是這樣的思路,,放棄一個(gè)消息只會(huì)被消費(fèi)一次的實(shí)現(xiàn)(因?yàn)榇鷥r(jià)太大,,且過(guò)于復(fù)雜,,實(shí)際上對(duì)于分布式的環(huán)境,,不太可能做到一個(gè)消息只會(huì)被消費(fèi)一次),而是采用確保一個(gè)消息至少會(huì)被消費(fèi)一次(即at least once).所以使用equeue,,應(yīng)用方要自己做好對(duì)每個(gè)消息的冪等處理,。


5.如何實(shí)現(xiàn)實(shí)時(shí)消息推送


消息的實(shí)時(shí)推送,一般有兩種做法:推模式(push)和拉模式(pull),。push的方式是指broker主動(dòng)對(duì)所有訂閱了該topic的消費(fèi)者推送消息,;pull的方式是指消費(fèi)者主動(dòng)到broker上拉取消息;對(duì)于推模式,,最大的好處就是實(shí)時(shí),,因?yàn)橐挥行碌南ⅲ蜁?huì)立即推送給消費(fèi)者,。但是有一個(gè)缺點(diǎn)就是如果消費(fèi)者來(lái)不及消費(fèi),,它也會(huì)給消費(fèi)者推消息,這樣就會(huì)導(dǎo)致消費(fèi)者端的消息會(huì)堵塞,。而通過(guò)拉的方式,,有兩種實(shí)現(xiàn):1)輪訓(xùn)的方式拉,比如每隔5s輪訓(xùn)一下是否有新消息,,這種方式的缺點(diǎn)是消息不實(shí)時(shí),,但是消費(fèi)進(jìn)度完全由消費(fèi)者自己把控了;2)開(kāi)長(zhǎng)連接的方式來(lái)拉,,就是不輪訓(xùn),,消費(fèi)者和broker之間一直保持的連接通道,然后broker一有新消息,,就會(huì)利用這個(gè)通道把消息發(fā)送給消費(fèi)者,。


equeue中目前采用的是通過(guò)長(zhǎng)連接拉取消息的方式。長(zhǎng)連接通過(guò)socket長(zhǎng)連接實(shí)現(xiàn),。但是雖然叫長(zhǎng)連接,,也不是一直不斷開(kāi),而是也會(huì)設(shè)計(jì)一個(gè)超時(shí)的限制,,比如一個(gè)長(zhǎng)連接最大不超過(guò)15s,,超過(guò)15s,,則broker發(fā)送回復(fù)給consumer,告訴consumer當(dāng)前沒(méi)有新消息,;然后consumer接受到這個(gè)回復(fù)后,,就知道要繼續(xù)發(fā)起下一個(gè)長(zhǎng)連接來(lái)拉取。然后假如在這15s中之內(nèi),,broker上有新消息了,,則broker就能立即主動(dòng)利用這個(gè)長(zhǎng)連接通知相應(yīng)的消費(fèi)者,把消息傳給消費(fèi)者,。所以,,可以看出,broker上在處理消費(fèi)者的拉取消息的請(qǐng)求時(shí),,如果當(dāng)前沒(méi)有新消息,,則會(huì)hold住這個(gè)socket連接,最多hold 15s,,超過(guò)15s,,則發(fā)送返回信息,告訴消費(fèi)者當(dāng)前無(wú)消息,,然后消費(fèi)者再次發(fā)送pull message request過(guò)來(lái),。通過(guò)這樣的基于長(zhǎng)連接的拉取模式,我們可以實(shí)現(xiàn)兩個(gè)好處:1)消息實(shí)時(shí)推送,;2)由消費(fèi)者控制消息消費(fèi)進(jìn)度,;


另外,equeue里還實(shí)現(xiàn)了消費(fèi)者自身的自動(dòng)限流功能,。就是假如當(dāng)前broker上消息很多,,即生產(chǎn)者生產(chǎn)消息的速度大于消費(fèi)者消費(fèi)消息的速度,那broker上就會(huì)有消息被堆積,。那此時(shí)消費(fèi)者在拉取消息時(shí),,總是會(huì)有新消息拉取到,但是消費(fèi)者又來(lái)不及處理這么多消息,。所以equeue框架內(nèi)置了一個(gè)限流(流控,,流量控制)的設(shè)計(jì),就是可以允許用于配制一個(gè)消費(fèi)者端堆積的消息的上限,,比如3000,,超過(guò)這個(gè)數(shù)目(可配置),則equeue會(huì)讓消費(fèi)者以慢一點(diǎn)的頻率拉取消息,。比如延遲個(gè)多少毫秒(延遲時(shí)間可配置)再拉取,。這樣就簡(jiǎn)單的實(shí)現(xiàn)了流控的目的。


6.如何處理消息消費(fèi)失敗的情況


作為一個(gè)消息隊(duì)列,消費(fèi)者總是可能會(huì)在消費(fèi)消息時(shí)拋出異常,,在equeue中這種情況就是消息消費(fèi)失敗的情況,。通過(guò)上面的消費(fèi)進(jìn)度的介紹,大家知道了每個(gè)queue對(duì)某個(gè)特定的consumer group,,都有一個(gè)唯一的消費(fèi)進(jìn)度,。實(shí)際上,消息被拉取到consumer本地后,,可能會(huì)被以?xún)煞N方式消費(fèi),,一種是并行消費(fèi),一種是線性消費(fèi),。


并行消費(fèi)的意思是,,假如當(dāng)前一次性拉取過(guò)來(lái)32個(gè)消息,那equeue會(huì)通過(guò)啟動(dòng)task(即開(kāi)多線程)的方式并行消費(fèi)每個(gè)消息,;


線性消費(fèi)的意思是,,消息是在一個(gè)獨(dú)立的單線程中順序消費(fèi),消費(fèi)順序和拉取過(guò)來(lái)的順序相同,。


對(duì)于線性消費(fèi),假如前一個(gè)消息消費(fèi)的時(shí)候失敗了,,也就是拋異常了,,那該怎么辦呢?可能想到的辦法是重試個(gè)3次,,但是要是重試后還是失敗呢,?總不能因?yàn)檫@個(gè)消息而導(dǎo)致后面的消息無(wú)法把消費(fèi)吧?呵呵,!對(duì)于這種情況,,先說(shuō)一下rocketmq里的處理方式吧:它的做法是,當(dāng)遇到消費(fèi)失敗的情況,,沒(méi)有立馬重試,,而是直接把這個(gè)消息發(fā)送到broker上的某個(gè)重試隊(duì)列,發(fā)送成功后,,就可以往下消費(fèi)下一個(gè)消息了,。因?yàn)橐坏┌l(fā)送到重試隊(duì)列,那意味著這個(gè)消息就最后總是會(huì)被消費(fèi)了,,因?yàn)樵撓⒉粫?huì)丟了,。但是要是發(fā)送到broker的重試隊(duì)列也不成功呢?這個(gè),?,!其實(shí)這種情況不大應(yīng)該出現(xiàn),如果出現(xiàn),那基本就是broker掛了,,呵呵,。


rocketmq中,對(duì)于這種情況,,那會(huì)把這個(gè)失敗的消息放入本地內(nèi)存隊(duì)列,,慢慢消費(fèi)它。然后繼續(xù)往后消費(fèi)后面的消息?,F(xiàn)在你一定很關(guān)心queue的offset是如何更新的,?這里涉及到一個(gè)滑動(dòng)門(mén)的概念。當(dāng)一批消息從broker拉取到消費(fèi)者本地后,,并不是馬上消費(fèi)的,,而是先放入一個(gè)本地的SortedDictionary,key就是消息在queue里的位置,,value就是消息本身,。因?yàn)槭且粋€(gè)排序的dictionary,所以key最小的消息意味著是最前面的消息,,最大的消息就是最后面的消息,。然后不管是并行消費(fèi)還是線性消費(fèi),只要某個(gè)消息被消費(fèi)了,,那就從這個(gè)SortedDictionary里移除掉,。每次被移除一個(gè)消息時(shí),總是會(huì)返回當(dāng)前這個(gè)SortedDictionary里的最小的key,,然后我們就能判斷這個(gè)key是否和上次比是否前移了,,如果是,則更新queue的這個(gè)最新的offset,。因?yàn)槊看我瞥粋€(gè)消息的時(shí)候,,總是返回當(dāng)前SortedDictionary里的最小的key,所以,,假如當(dāng)前offset是3,,然后offset為4的這個(gè)消息一直消費(fèi)失敗,所以不會(huì)被移除,,但是offset為5,6,7,8的這些消息雖然都消費(fèi)成功了,,但是只要offset為4的這個(gè)消息沒(méi)有被移除,那最小的key就不會(huì)往前移動(dòng),。這個(gè)就是所謂的滑動(dòng)門(mén)的概念了,。就好比是在鐵軌上一輛在跑的動(dòng)車(chē),offset的往前移動(dòng)就好比是動(dòng)車(chē)在不斷往前移動(dòng),。因?yàn)槲覀兿M鹢ffset總是會(huì)不斷往前移動(dòng),,所以不希望前面的某個(gè)消費(fèi)失敗的消息讓這個(gè)滑動(dòng)門(mén)停止移動(dòng)(即我們總是希望這個(gè)最小的key能不斷變大),,所以我們會(huì)想方設(shè)法讓消費(fèi)失敗的消息能不阻礙滑動(dòng)門(mén)的往前移動(dòng)。所以才把消費(fèi)失敗的消息放入重試隊(duì)列,。


另外一點(diǎn)需要注意一下:并不是每次成功消費(fèi)完一個(gè)消息,,就會(huì)立馬告訴broker更新offset,因?yàn)檫@樣那性能肯定很低,,broker也會(huì)忙死,,更好的辦法是先只是在本地內(nèi)存更新queue的offset,然后定時(shí)比如5s一次,,將最新的offset更新到broker,。所以,因?yàn)檫@個(gè)異步的存在,,同樣也會(huì)導(dǎo)致某個(gè)消息被重復(fù)消費(fèi)的可能性,,因?yàn)閎roker上的offset肯定比實(shí)際的消費(fèi)進(jìn)度要慢,有5s的時(shí)間差,。所以,,再次強(qiáng)調(diào),應(yīng)用方必須要處理好對(duì)消息的冪等處理,!比如enode框架中,,對(duì)每個(gè)command消息,框架內(nèi)部都做了command的冪等處理,。所以使用enode框架的應(yīng)用,,自身無(wú)需對(duì)command做冪等處理方面的考慮。


上面提到了并行消費(fèi)和線性消費(fèi),,其實(shí)對(duì)于offset的更新來(lái)說(shuō)是一樣的,因?yàn)椴⑿邢M(fèi)無(wú)非是多線程同時(shí)從SortedDictionary中移除消費(fèi)成功的消息,,而單線程只是單個(gè)線程去移除SortedDictionary中的消息,。所以我們要通過(guò)鎖的機(jī)制,保證對(duì)SortedDictionary的操作是線程安全的,。目前用了ReaderWriterLockSlim來(lái)實(shí)現(xiàn)對(duì)方法調(diào)用的線層安全,。有興趣的朋友可以去看一下代碼。


最后,,也是重點(diǎn),,呵呵。equeue目前還沒(méi)有實(shí)現(xiàn)將失敗的消息發(fā)回到broker的重試隊(duì)列,。這個(gè)功能以后會(huì)考慮加進(jìn)去,。


7.如何解決Broker的單點(diǎn)問(wèn)題


這個(gè)問(wèn)題比較復(fù)雜,目前equeue不支持broker的master-salve或master-master,,而是單點(diǎn)的,。我覺(jué)得一個(gè)成熟的消息隊(duì)列,,為了確保在一個(gè)broker掛了的時(shí)候,要盡量能確保有其他broker可以接替它,,這樣才能讓消息隊(duì)列服務(wù)器的可靠性,。但是這個(gè)問(wèn)題實(shí)在太復(fù)雜。rocketmq目前實(shí)現(xiàn)的也只是master-slave的方式,。也就是只要主的master掛了,,那producer就無(wú)法向broker發(fā)送消息了,因?yàn)閟lave的broker是只讀的,,不能直接接受新消息,,slave的broker只能允許被consumer拉取消息。


這個(gè)問(wèn)題,,要討論清楚,,需要很多分布式方面的知識(shí)。由于篇幅的原因,,這里就不做討論了,,實(shí)際上我自己也搞不清楚到底該如何設(shè)計(jì)。希望大牛們多多指點(diǎn),,如何實(shí)現(xiàn)broker的高可用哈,!

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多