消息隊列連環(huán)炮
消息隊列技術(shù)選型解決的問題:
不用 MQ 系統(tǒng)耦合場景A 系統(tǒng)產(chǎn)生了一個比較關(guān)鍵的數(shù)據(jù),很多系統(tǒng)需要 A 系統(tǒng)將數(shù)據(jù)發(fā)過來,,強(qiáng)耦合(B,C,D,E 系統(tǒng)可能參數(shù)不一樣,、一會需要一會不需要數(shù)據(jù),A 系統(tǒng)要不斷修改代碼維護(hù)) A 系統(tǒng)還要考慮 B,、C,、D、E 系統(tǒng)是否掛了,,是否訪問超時,?是否重試? 使用 MQ 系統(tǒng)解耦場景
總結(jié):通過一個 MQ 的發(fā)布訂閱消息模型(Pub/Sub), 系統(tǒng) A 跟其他系統(tǒng)就徹底解耦了。 不用 MQ 同步高延遲請求場景一般互聯(lián)網(wǎng)類的企業(yè),,對用戶的直接操作,,一般要求每個請求都必須在 200ms以內(nèi),對用戶幾乎是無感知的,。 使用 MQ 進(jìn)行異步化之后的接口性能優(yōu)化提高高延時接口 沒有用 MQ 時高峰期系統(tǒng)被打死的場景高峰期每秒 5000 個請求,,每秒對 MySQL 執(zhí)行 5000 條 SQL(一般MySQL每秒 2000 個請求差不多了),如果MySQL被打死,,然后整個系統(tǒng)就崩潰,,用戶就沒辦法使用系統(tǒng)了。但是高峰期過了之后,,每秒鐘可能就 50 個請求,,對整個系統(tǒng)沒有任何壓力。 使用 MQ 進(jìn)行削峰的場景5000 個請求寫入到 MQ 里面,,系統(tǒng) A 每秒鐘最多只能處理 2000 個請求(MySQL 每秒鐘最多處理 2000 個請求),,系統(tǒng) A 從 MQ 里慢慢拉取請求,每秒鐘拉取 2000 個請求,。MQ,,每秒鐘 5000 個請求進(jìn)來,結(jié)果只有 2000 個請求出去,結(jié)果導(dǎo)致在高峰期(21小時),,可能有幾十萬甚至幾百萬的請求積壓在 MQ 中,,這個是正常的,因為過了高峰期之后,,每秒鐘就 50 個請求,,但是系統(tǒng) A 還是會按照每秒 2000 個該請求的速度去處理。只要高峰期一過,,系統(tǒng) A 就會快速的將積壓的消息給解決掉,。
架構(gòu)中引入 MQ 后存在的問題
MQ 可能掛掉,,導(dǎo)致整個系統(tǒng)崩潰
可能發(fā)重復(fù)消息,導(dǎo)致插入重復(fù)數(shù)據(jù),;消息丟了,;消息順序亂了;系統(tǒng) B,C,D 掛了,,導(dǎo)致 MQ 消息積累,,磁盤滿了;
本來應(yīng)該A,B,C,D 都執(zhí)行成功了再返回,,結(jié)果A,B,C 執(zhí)行成功 D 失敗 Kafka,、ActiveMQ、RabbitMQ,、RocketMQ 有什么優(yōu)缺點(diǎn)建議:中小型公司 RabbitMQ 大公司:RocketMQ 大數(shù)據(jù)實時計算:Kafka 消息隊列高可用RabbtitMQ 高可用RabbitMQ有三種模式:單機(jī)模式 ,、普通集群模式、鏡像集群模式
demo級
隊列的元數(shù)據(jù)存在于多個實例中,,但是消息不存在多個實例中,,每次多臺機(jī)器上啟動多個 rabbitmq 實例,每個機(jī)器啟動一個,。
沒有高可用性可言
隊列的元數(shù)據(jù)和消息都會存在于多個實例中,,每次寫消息到 queue的時候,都會自動把消息到多個實例的 queue 里進(jìn)行消息同步。也就 是每個節(jié)點(diǎn)上都有這個 queue 的一個完整鏡像(這個 queue的全部數(shù)據(jù)),。任何一個節(jié)點(diǎn)宕機(jī)了,,其他節(jié)點(diǎn)還包含這個 queue的完整數(shù)據(jù),其他 consumer 都可以到其他活著的節(jié)點(diǎn)上去消費(fèi)數(shù)據(jù)都是 OK 的,。 缺點(diǎn):不是分布式的,,如果這個 queue的數(shù)據(jù)量很大,大到這個機(jī)器上的容量無法容納 ,。 開啟鏡像集群模式方法:管理控制臺,,Admin頁面下,新增一個鏡像集群模式的策略,,指定的時候可以要求數(shù)據(jù)同步到所有節(jié)點(diǎn),,也可以要求同步到指定數(shù)量的節(jié)點(diǎn),然后你再次創(chuàng)建 queue 的時候 ,,應(yīng)用這個策略,,就 會自動將數(shù)據(jù)同步到其他的節(jié)點(diǎn)上去。
broker進(jìn)程就是kafka在每臺機(jī)器上啟動的自己的一個進(jìn)程,。每臺機(jī)器+機(jī)器上的broker進(jìn)程,,就可以認(rèn)為是 kafka集群中的一個節(jié)點(diǎn)。 你創(chuàng)建一個 topic,這個topic可以劃分為多個 partition,每個 partition 可以存在于不同的 broker 上,,每個 partition就存放一部分?jǐn)?shù)據(jù),。 這就是天然的分布式消息隊列,也就是說一個 topic的數(shù)據(jù),,是分散放在 多個機(jī)器上的,,每個機(jī)器就放一部分?jǐn)?shù)據(jù)。 分布式的真正含義是每個節(jié)點(diǎn)只放一部分?jǐn)?shù)據(jù),,而不是完整數(shù)據(jù)(完整數(shù)據(jù)就是HA,、集群機(jī)制)
每個 partition的數(shù)據(jù)都會同步到其他機(jī)器上,形成自己的多個 replica 副本,。然后所有 replica 會選舉一個 leader,。那么生產(chǎn)者、消費(fèi)者都會和這個 leader 打交道,,然后其他 replica 就是 follow,。寫的時候,leader 負(fù)責(zé)把數(shù)據(jù)同步到所有 follower上去,讀的時候就直接讀 leader 上的數(shù)據(jù)即可,。 如果某個 broker宕機(jī)了,,剛好也是 partition的leader,那么此時會選舉一個新的 leader出來,,大家繼續(xù)讀寫那個新的 leader即可,,這個就 是所謂的高可用性。更多面試題:面試題內(nèi)容聚合 leader和follower的同步機(jī)制:寫數(shù)據(jù)的時候,,生產(chǎn)者就寫 leader,,然后 leader將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動從 leader來pull數(shù)據(jù),。一旦所有 follower同步好數(shù)據(jù)了,,就會發(fā)送 ack給 leader,leader收到所有 follower的 ack之后,,就會返回寫成功的消息給生產(chǎn)者,。 消費(fèi)的時候,只會從 leader去讀,,但是只有一個消息已經(jīng)被所有 follower都同步成功返回 ack的時候,,這個消息才會被消費(fèi)者讀到。 消息隊列重復(fù)數(shù)據(jù)MQ 只能保證消息不丟,,不能保證重復(fù)發(fā)送 Kafka 消費(fèi)端可能出現(xiàn)的重復(fù)消費(fèi)問題每條消息都有一個 offset 代表 了這個消息的順序的序號,,按照數(shù)據(jù)進(jìn)入 kafka的順序,kafka會給每條數(shù)據(jù)分配一個 offset,代表了這個是數(shù)據(jù)的序號,,消費(fèi)者從 kafka去消費(fèi)的時候,,按照這個順序去消費(fèi),消費(fèi)者會去提交 offset,,就是告訴 kafka已經(jīng)消費(fèi)到 offset=153這條數(shù)據(jù)了 ,;zk里面就記錄了消費(fèi)者當(dāng)前消費(fèi)到了 offset =幾的那條消息;假如此時消費(fèi)者系統(tǒng)被重啟,,重啟之后,,消費(fèi)者會找kafka,讓kafka把上次我消費(fèi)到的那個地方后面的數(shù)據(jù)繼續(xù)給我傳遞過來,。更多面試題:面試題內(nèi)容聚合
消費(fèi)者不是說消費(fèi)完一條數(shù)據(jù)就立馬提交 offset的,,而是定時定期提交一次 offset。消費(fèi)者如果再準(zhǔn)備提交 offset,,但是還沒提交 offset的時候,,消費(fèi)者進(jìn)程重啟了,那么此時已經(jīng)消費(fèi)過的消息的 offset并沒有提交,,kafka也就不知道你已經(jīng)消費(fèi)了 offset= 153那條數(shù)據(jù),,這個時候kafka會給你發(fā)offset=152,153,154的數(shù)據(jù),,此時 offset = 152,153的消息重復(fù)消費(fèi)了 保證 MQ 重復(fù)消費(fèi)冪等性冪等:一個數(shù)據(jù)或者一個請求,給你重復(fù)來多次,,你得確保對應(yīng)的數(shù)據(jù)是不會改變的,不能出錯,。
保證 MQ 消息不丟MQ 傳遞非常核心的消息,,比如:廣告計費(fèi)系統(tǒng),,用戶點(diǎn)擊一次廣告,扣費(fèi)一塊錢,,如果扣費(fèi)的時候消息丟了,,則會不斷少錢,積少成多,,對公司是一個很大的損失,。 RabbitMQ可能存在的數(shù)據(jù)丟失問題
問題 1解決方案: 事務(wù)機(jī)制:(一般不采用,,同步的,,生產(chǎn)者發(fā)送消息會同步阻塞卡住等待你是成功還是失敗。會導(dǎo)致生產(chǎn)者發(fā)送消息的吞吐量降下來) channel.txSelect confirm機(jī)制:(一般采用這種機(jī)制,,異步的模式,,不會阻塞,吞吐量會比較高)
問題 2 解決方案: 持久化到磁盤
缺點(diǎn):可能會有一點(diǎn)點(diǎn)丟失數(shù)據(jù)的可能,消息剛好寫到了 rabbitmq中,,但是還沒來得及持久化到磁盤上,,結(jié)果不巧, rabbitmq掛了,,會導(dǎo)致內(nèi)存里的一點(diǎn)點(diǎn)數(shù)據(jù)會丟失,。更多面試題:面試題內(nèi)容聚合 問題 3 解決方案: 原因:消費(fèi)者打開了 autoAck機(jī)制(消費(fèi)到一條消息,還在處理中,,還沒處理完,,此時消費(fèi)者自動 autoAck了,通知 rabbitmq說這條消息已經(jīng)消費(fèi)了,,此時不巧,,消費(fèi)者系統(tǒng)宕機(jī)了,,那條消息丟失了,還沒處理完,,而且 rabbitmq還以為這個消息已經(jīng)處理掉了) 解決方案:關(guān)閉 autoAck,自己處理完了一條消息后,,再發(fā)送 ack給 rabbitmq,如果此時還沒處理完就宕機(jī)了,此時rabbitmq沒收到你發(fā)的ack消息,,然后 rabbitmq 就會將這條消息重新分配給其他的消費(fèi)者去處理,。 Kafka 可能存在的數(shù)據(jù)丟失問題消費(fèi)端弄丟數(shù)據(jù)原因:消費(fèi)者消費(fèi)到那條消息后,自動提交了 offset,,kafka以為你已經(jīng)消費(fèi)好了這條消息,結(jié)果消費(fèi)者掛了,,這條消息就丟了,。 例子:消費(fèi)者消費(fèi)到數(shù)據(jù)后寫到一個內(nèi)存 queue里緩存下,消息自動提交 offset,,重啟了系統(tǒng),,結(jié)果會導(dǎo)致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)丟失。 解決方法:kafka會自動提交 offset,,那么只要關(guān)閉自動提交 offset,,在處理完之后自己手動提交,可以保證數(shù)據(jù)不會丟,。但是此時確實還是會重復(fù)消費(fèi),,比如剛好處理完,還沒提交 offset,,結(jié)果自己掛了,,此時肯定會重復(fù)消費(fèi)一次 ,做好冪等即可,。 Kafka 丟掉消息原因:kafka 某個 broker 宕機(jī),,然后重新選舉 partition 的 leader時,此時其他的 follower 剛好還有一些數(shù)據(jù)沒有同步,,結(jié)果此時 leader掛了,,然后選舉某個 follower成 leader之后,就丟掉了之前l(fā)eader里未同步的數(shù)據(jù),。更多面試題:面試題內(nèi)容聚合 例子:kafka的leader機(jī)器宕機(jī),,將 follower 切換為 leader之后,發(fā)現(xiàn)數(shù)據(jù)丟了
按 2 的方案設(shè)置了 ack =all,,一定不會丟,。它會要求 leader 接收到消息,所有的 follower 都同步 到了消息之后,,才認(rèn)為本次寫成功,。如果沒滿足這個條件,生產(chǎn)者會無限次重試 ,。 消息隊列順序性背景:mysql binlog 同步的系統(tǒng),,在mysql里增刪改一條數(shù)據(jù),對應(yīng)出來了增刪改 3 條binlog,,接著這 3 條binlog發(fā)送到 MQ 里面,,到消費(fèi)出來依次執(zhí)行,起碼是要保證順序的吧,,不然順序變成了 刪除,、修改、增加,。日同步數(shù)據(jù)達(dá)到上億,,mysql->mysql,比如大數(shù)據(jù) team,需要同步一個mysql庫,,來對公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復(fù)雜的操作,。 場景:
RabbitMQ 消息順序錯亂RabbitMQ 如何保證消息順序性需要保證順序的數(shù)據(jù)放到同一個queue里 Kafka 消息順序錯亂寫入一個 partition中的數(shù)據(jù)一定是有順序的,。 生產(chǎn)者在寫的時候,,可以指定一個 key,比如訂單id作為key,那么訂單相關(guān)的數(shù)據(jù),,一定會被分發(fā)到一個 partition中區(qū),,此時這個 partition中的數(shù)據(jù)一定是有順序的,。Kafka 中一個 partition 只能被一個消費(fèi)者消費(fèi)。消費(fèi)者從partition中取出數(shù)據(jù)的時候 ,,一定是有順序的,。 Kafka 保證消息順序性如果消費(fèi)者單線程消費(fèi)+處理,如果處理比較耗時,,處理一條消息是幾十ms,,一秒鐘只能處理幾十條數(shù)據(jù),這個吞吐量太低了,??隙ㄒ枚嗑€程去并發(fā)處理,壓測消費(fèi)者4 核 8G 單機(jī),,32 條線程,,最高每秒可以處理上千條消息 消息隊列延遲以及過期失效消費(fèi)端出了問題,不消費(fèi)了或者消費(fèi)極其慢,。接著坑爹了,你的消息隊列集群的磁盤都快寫滿了 ,,都沒人消費(fèi),,怎么辦?積壓了幾個小時,,rabbitmq設(shè)置了消息過期時間后就沒了,,怎么辦? 例如:
場景:幾千萬條數(shù)據(jù)再 MQ 里積壓了七八個小時 快速處理積壓的消息一個消費(fèi)者一秒是 1000 條,,一秒 3 個消費(fèi)者是 3000 條,一分鐘是 18W 條,,1000 多 W 條需要一個小時恢復(fù),。 步驟:
原來 3 個消費(fèi)者需要 1 個小時可以搞定,,現(xiàn)在 30 個臨時消費(fèi)者需要 10 分鐘就可以搞定。 如果用的 rabbitmq,,并且設(shè)置了過期時間,,如果此消費(fèi)在 queue里積壓超過一定的時間會被 rabbitmq清理掉,數(shù)據(jù)直接搞丟,。 如果消息積壓mq,長時間沒被處理掉,,導(dǎo)致mq快寫完滿了,,你臨時寫一個程序,接入數(shù)據(jù)來消費(fèi),,寫到一個臨時的mq里,,再讓其他消費(fèi)者慢慢消費(fèi) 或者消費(fèi)一個丟棄一個,都不要了,,快速消費(fèi)掉所有的消息,,然后晚上補(bǔ)數(shù)據(jù)。 如何設(shè)計消息隊列中間件架構(gòu)
|
|