久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Kafka剖析(一):高擴(kuò)展、高吞吐的分布式消息系統(tǒng)初探

 云端素館 2015-03-11


Kafka是由LinkedIn開(kāi)發(fā)的一個(gè)分布式的消息系統(tǒng),,使用Scala編寫(xiě),,它以可水平擴(kuò)展和高吞吐率而被廣泛使用。目前越來(lái)越多的開(kāi)源分布式處理系統(tǒng)如Cloudera,、Apache Storm,、Spark都支持與Kafka集成。InfoQ一直在緊密關(guān)注Kafka的應(yīng)用以及發(fā)展,,“Kafka剖析”專欄將會(huì)從架構(gòu)設(shè)計(jì),、實(shí)現(xiàn)、應(yīng)用場(chǎng)景,、性能等方面深度解析Kafka,。


背景介紹


Kafka是一個(gè)消息系統(tǒng),原本開(kāi)發(fā)自LinkedIn,,用作LinkedIn的活動(dòng)流(Activity Stream)和運(yùn)營(yíng)數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)?,F(xiàn)在它已被多家不同類型的公司作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。


活動(dòng)流數(shù)據(jù)是幾乎所有站點(diǎn)在對(duì)其網(wǎng)站使用情況做報(bào)表時(shí)都要用到的數(shù)據(jù)中最常規(guī)的部分,?;顒?dòng)數(shù)據(jù)包括頁(yè)面訪問(wèn)量、被查看內(nèi)容方面的信息以及搜索情況等內(nèi)容,。這種數(shù)據(jù)通常的處理方式是先把各種活動(dòng)以日志的形式寫(xiě)入某種文件,,然后周期性地對(duì)這些文件進(jìn)行統(tǒng)計(jì)分析。運(yùn)營(yíng)數(shù)據(jù)指的是服務(wù)器的性能數(shù)據(jù)(CPU,、IO使用率,、請(qǐng)求時(shí)間、服務(wù)日志等等數(shù)據(jù)),。運(yùn)營(yíng)數(shù)據(jù)的統(tǒng)計(jì)方法種類繁多,。


近年來(lái),活動(dòng)和運(yùn)營(yíng)數(shù)據(jù)處理已經(jīng)成為了網(wǎng)站軟件產(chǎn)品特性中一個(gè)至關(guān)重要的組成部分,,這就需要一套稍微更加復(fù)雜的基礎(chǔ)設(shè)施對(duì)其提供支持,。


Kafka簡(jiǎn)介


Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng),。主要設(shè)計(jì)目標(biāo)如下:

以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問(wèn)性能。


高吞吐率,。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條以上消息的傳輸,。


支持Kafka Server間的消息分區(qū),及分布式消費(fèi),,同時(shí)保證每個(gè)Partition內(nèi)的消息順序傳輸,。


同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理,。


Scale out:支持在線水平擴(kuò)展。


為何使用消息系統(tǒng),?


解耦

在項(xiàng)目啟動(dòng)之初來(lái)預(yù)測(cè)將來(lái)項(xiàng)目會(huì)碰到什么需求,,是極其困難的。消息系統(tǒng)在處理過(guò)程中間插入了一個(gè)隱含的,、基于數(shù)據(jù)的接口層,,兩邊的處理過(guò)程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,,只要確保它們遵守同樣的接口約束。


冗余

有些情況下,,處理數(shù)據(jù)的過(guò)程會(huì)失敗,。除非數(shù)據(jù)被持久化,否則將造成丟失,。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,,通過(guò)這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,,在把一個(gè)消息從隊(duì)列中刪除之前,,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢,。


擴(kuò)展性

因?yàn)橄㈥?duì)列解耦了你的處理過(guò)程,,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過(guò)程即可,。不需要改變代碼,、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡(jiǎn)單,。


靈活性 & 峰值處理能力

在訪問(wèn)量劇增的情況下,,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見(jiàn),;如果為以能處理這類峰值訪問(wèn)為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi),。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰,。


可恢復(fù)性

系統(tǒng)的一部分組件失效時(shí),,不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,,所以即使一個(gè)處理消息的進(jìn)程掛掉,,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。


順序保證

在大多使用場(chǎng)景下,,數(shù)據(jù)處理的順序都很重要,。大部分消息隊(duì)列本來(lái)就是排序的,,并且能保證數(shù)據(jù)會(huì)按照特定的順序來(lái)處理。Kafka保證一個(gè)Partition內(nèi)的消息的有序性,。


緩沖

在任何重要的系統(tǒng)中,,都會(huì)有需要不同的處理時(shí)間的元素。例如,,加載一張圖片比應(yīng)用過(guò)濾器花費(fèi)更少的時(shí)間,。消息隊(duì)列通過(guò)一個(gè)緩沖層來(lái)幫助任務(wù)最高效率的執(zhí)行———寫(xiě)入隊(duì)列的處理會(huì)盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度,。


異步通信

很多時(shí)候,,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,,允許用戶把一個(gè)消息放入隊(duì)列,,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,,然后在需要的時(shí)候再去處理它們,。


常用Message Queue對(duì)比


RabbitMQ

RabbitMQ是使用Erlang編寫(xiě)的一個(gè)開(kāi)源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,,XMPP, SMTP, STOMP,,也正因如此,它非常重量級(jí),,更適合于企業(yè)級(jí)的開(kāi)發(fā),。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì),。對(duì)路由,,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。


Redis

Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù),,開(kāi)發(fā)維護(hù)很活躍,。雖然它是一個(gè)Key-Value數(shù)據(jù)庫(kù)存儲(chǔ)系統(tǒng),但它本身支持MQ功能,,所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來(lái)使用,。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬(wàn)次,,每10萬(wàn)次記錄一次執(zhí)行時(shí)間,。測(cè)試數(shù)據(jù)分為128Bytes、512Bytes,、1K和10K四個(gè)不同大小的數(shù)據(jù),。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ,而如果數(shù)據(jù)大小超過(guò)了10K,,Redis則慢的無(wú)法忍受,;出隊(duì)時(shí),無(wú)論數(shù)據(jù)大小,,Redis都表現(xiàn)出非常好的性能,,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis。


ZeroMQ

ZeroMQ號(hào)稱最快的消息隊(duì)列系統(tǒng),,尤其針對(duì)大吞吐量的需求場(chǎng)景,。ZeroMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列,但是開(kāi)發(fā)人員需要自己組合多種技術(shù)框架,,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn),。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,,因?yàn)槟愕膽?yīng)用程序?qū)缪葸@個(gè)服務(wù)器角色,。你只需要簡(jiǎn)單的引用ZeroMQ程序庫(kù),可以使用NuGet安裝,,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但是ZeroMQ僅提供非持久性的隊(duì)列,,也就是說(shuō)如果宕機(jī),,數(shù)據(jù)將會(huì)丟失。其中,,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開(kāi)始同時(shí)支持ZeroMQ和Netty作為傳輸模塊),。


ActiveMQ

ActiveMQ是Apache下的一個(gè)子項(xiàng)目。類似于ZeroMQ,,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列,。同時(shí)類似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景,。


Kafka/Jafka

Kafka是Apache下的一個(gè)子項(xiàng)目,,是一個(gè)高性能跨語(yǔ)言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵化而來(lái)的,,即Kafka的一個(gè)升級(jí)版,。具有以下特性:快速持久化,可以在O(1)的系統(tǒng)開(kāi)銷下進(jìn)行消息持久化,;高吞吐,,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng),,Broker,、Producer、Consumer都原生自動(dòng)支持分布式,,自動(dòng)實(shí)現(xiàn)負(fù)載均衡,;支持Hadoop數(shù)據(jù)并行加載,,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,,這是一個(gè)可行的解決方案,。Kafka通過(guò)Hadoop的并行加載機(jī)制統(tǒng)一了在線和離線的消息處理。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng),,除了性能非常好之外,,還是一個(gè)工作良好的分布式系統(tǒng)。


Kafka架構(gòu)


Terminology

系統(tǒng)的一部分組件失效時(shí),,不會(huì)影響到整個(gè)系統(tǒng),。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理,。


Broker

Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker


Topic

每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為T(mén)opic,。(物理上不同Topic的消息分開(kāi)存儲(chǔ),,邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)


Partition

Parition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.


Producer

負(fù)責(zé)發(fā)布消息到Kafka broker


Consumer

消息消費(fèi)者,,向Kafka broker讀取消息的客戶端。


Consumer Group

每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,,若不指定group name則屬于默認(rèn)的group),。


Kafka拓?fù)浣Y(jié)構(gòu)


  

如上圖所示,一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,,或者是服務(wù)器日志,,系統(tǒng)CPU、Memory等),,若干broker(Kafka支持水平擴(kuò)展,,一般broker數(shù)量越多,集群吞吐率越高),,若干Consumer Group,,以及一個(gè)Zookeeper集群。Kafka通過(guò)Zookeeper管理集群配置,,選舉leader,,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,,Consumer使用pull模式從broker訂閱并消費(fèi)消息,。


Topic & Partition

Topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消費(fèi)都必須指定它的Topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里,。為了使得Kafka的吞吐率可以線性提高,,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對(duì)應(yīng)一個(gè)文件夾,,該文件夾下存儲(chǔ)這個(gè)Partition的所有消息和索引文件,。若創(chuàng)建topic1和topic2兩個(gè)topic,且分別有13個(gè)和19個(gè)分區(qū),,則整個(gè)集群上會(huì)相應(yīng)會(huì)生成共32個(gè)文件夾(本文所用集群共8個(gè)節(jié)點(diǎn),,此處topic1和topic2 replication-factor均為1),如下圖所示,。

  

每個(gè)日志文件都是一個(gè)log entrie序列,,每個(gè)log entrie包含一個(gè)4字節(jié)整型數(shù)值(值為N+5),1個(gè)字節(jié)的"magic value",,4個(gè)字節(jié)的CRC校驗(yàn)碼,,其后跟N個(gè)字節(jié)的消息體。每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset,,它指明了這條消息的起始位置,。磁盤(pán)上存儲(chǔ)的消息格式如下:



這個(gè)log entries并非由一個(gè)文件構(gòu)成,而是分成多個(gè)segment,,每個(gè)segment以該segment第一條消息的offset命名并以“.kafka”為后綴,。另外會(huì)有一個(gè)索引文件,它標(biāo)明了每個(gè)segment下包含的log entry的offset范圍,,如下圖所示。


     

因?yàn)槊織l消息都被append到該P(yáng)artition中,,屬于順序?qū)懘疟P(pán),,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P(pán)效率比隨機(jī)寫(xiě)內(nèi)存還要高,,這是Kafka高吞吐率的一個(gè)很重要的保證),。


  

  

對(duì)于傳統(tǒng)的message queue而言,一般會(huì)刪除已經(jīng)被消費(fèi)的消息,,而Kafka集群會(huì)保留所有的消息,,無(wú)論其被消費(fèi)與否。當(dāng)然,,因?yàn)榇疟P(pán)限制,,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒(méi)必要),因此Kafka提供兩種策略刪除舊數(shù)據(jù),。一是基于時(shí)間,,二是基于Partition文件大小。例如可以通過(guò)配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數(shù)據(jù),,也可在Partition文件超過(guò)1GB時(shí)刪除舊數(shù)據(jù),,配置如下所示(點(diǎn)擊圖片放大):


這里要注意,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),,即與文件大小無(wú)關(guān),,所以這里刪除過(guò)期文件與提高Kafka性能無(wú)關(guān)。選擇怎樣的刪除策略只與磁盤(pán)以及具體的需求有關(guān),。另外,,Kafka會(huì)為每一個(gè)Consumer Group保留一些metadata信息——當(dāng)前消費(fèi)的消息的position,也即offset,。這個(gè)offset由Consumer控制,。正常情況下Consumer會(huì)在消費(fèi)完一條消息后遞增該offset。當(dāng)然,,Consumer也可將offset設(shè)成一個(gè)較小的值,,重新消費(fèi)一些消息。因?yàn)閛ffet由Consumer控制,,所以Kafka broker是無(wú)狀態(tài)的,,它不需要標(biāo)記哪些消息被哪些消費(fèi)過(guò),也不需要通過(guò)broker去保證同一個(gè)Consumer Group只有一個(gè)Consumer能消費(fèi)某一條消息,,因此也就不需要鎖機(jī)制,,這也為Kafka的高吞吐率提供了有力保障。

Producer消息路由


Producer發(fā)送消息到broker時(shí),,會(huì)根據(jù)Paritition機(jī)制選擇將其存儲(chǔ)到哪一個(gè)Partition,。如果Partition機(jī)制設(shè)置合理,所有消息可以均勻分布到不同的Partition里,,這樣就實(shí)現(xiàn)了負(fù)載均衡,。如果一個(gè)Topic對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器I/O將會(huì)成為這個(gè)Topic的性能瓶頸,,而有了Partition后,,不同的消息可以并行寫(xiě)入不同broker的不同Partition里,極大的提高了吞吐率,??梢栽?strong>$KAFKA_HOME/config/server.properties中通過(guò)配置項(xiàng)num.partitions來(lái)指定新建Topic的默認(rèn)Partition數(shù)量,也可在創(chuàng)建Topic時(shí)通過(guò)參數(shù)指定,,同時(shí)也可以在Topic創(chuàng)建之后通過(guò)Kafka提供的工具修改,。

在發(fā)送一條消息時(shí),可以指定這條消息的key,,Producer根據(jù)這個(gè)key和Partition機(jī)制來(lái)判斷應(yīng)該將這條消息發(fā)送到哪個(gè)Parition,。Paritition機(jī)制可以通過(guò)指定Producer的paritition.class這一參數(shù)來(lái)指定,,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口。本例中如果key可以被解析為整數(shù)則將對(duì)應(yīng)的整數(shù)與Partition總數(shù)取余,,該消息會(huì)被發(fā)送到該數(shù)對(duì)應(yīng)的Partition,。(每個(gè)Parition都會(huì)有個(gè)序號(hào),序號(hào)從0開(kāi)始)(點(diǎn)擊圖片放大)


如果將上例中的類作為partition.class,并通過(guò)如下代碼發(fā)送20條消息(key分別為0,,1,,2,3)至topic3(包含4個(gè)Partition),。


則key相同的消息會(huì)被發(fā)送并存儲(chǔ)到同一個(gè)partition里,,而且key的序號(hào)正好和Partition序號(hào)相同。(Partition序號(hào)從0開(kāi)始,,本例中的key也從0開(kāi)始),。下圖所示是通過(guò)Java程序調(diào)用Consumer后打印出的消息列表。

  

Consumer Group


本節(jié)所有描述都是基于Consumer hight level API而非low level API,。

使用Consumer high level API時(shí),,同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi),但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息,。



這是Kafka用來(lái)實(shí)現(xiàn)一個(gè)Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個(gè)Consumer)的手段,。一個(gè)Topic可以對(duì)應(yīng)多個(gè)Consumer Group。如果需要實(shí)現(xiàn)廣播,,只要每個(gè)Consumer有一個(gè)獨(dú)立的Group就可以了,。要實(shí)現(xiàn)單播只要所有的Consumer在同一個(gè)Group里。用Consumer Group還可以將Consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的Topic,。
實(shí)際上,,Kafka的設(shè)計(jì)理念之一就是同時(shí)提供離線處理和實(shí)時(shí)處理。根據(jù)這一特性,,可以使用Storm這種實(shí)時(shí)流處理系統(tǒng)對(duì)消息進(jìn)行實(shí)時(shí)在線處理,,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)實(shí)時(shí)備份到另一個(gè)數(shù)據(jù)中心,,只需要保證這三個(gè)操作所使用的Consumer屬于不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡(jiǎn)化部署示意圖,。

  
  

下面這個(gè)例子更清晰地展示了Kafka Consumer Group的特性,。首先創(chuàng)建一個(gè)Topic(名為topic1,包含3個(gè)Partition),,然后創(chuàng)建一個(gè)屬于group1的Consumer實(shí)例,,并創(chuàng)建三個(gè)屬于group2的Consumer實(shí)例,最后通過(guò)Producer向topic1發(fā)送key分別為1,,2,,3的消息,。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息,同時(shí)group2中的3個(gè)Consumer分別收到了key為1,,2,,3的消息。如下圖所示,。


Push vs. Pull  


作為一個(gè)消息系統(tǒng),,Kafka遵循了傳統(tǒng)的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息,。一些logging-centric system,,比如Facebook的Scribe和Cloudera的Flume,采用push模式,。事實(shí)上,,push模式和pull模式各有優(yōu)劣。
push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,,因?yàn)橄l(fā)送速率是由broker決定的,。push模式的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來(lái)不及處理消息,,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞,。而pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
對(duì)于Kafka而言,,pull模式更合適,。pull模式可簡(jiǎn)化broker的設(shè)計(jì),Consumer可自主控制消費(fèi)消息的速率,,同時(shí)Consumer可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。

Kafka delivery guarantee


有這么幾種可能的delivery guarantee:

At most once 消息可能會(huì)丟,,但絕不會(huì)重復(fù)傳輸


At least one 消息絕不會(huì)丟,,但可能會(huì)重復(fù)傳輸


Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次,很多時(shí)候這是用戶所想要的,。

 

當(dāng)Producer向broker發(fā)送消息時(shí),,一旦這條消息被commit,因數(shù)replication的存在,,它就不會(huì)丟,。但是如果Producer發(fā)送數(shù)據(jù)給broker后,遇到網(wǎng)絡(luò)問(wèn)題而造成通信中斷,,那Producer就無(wú)法判斷該條消息是否已經(jīng)commit,。雖然Kafka無(wú)法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是Producer可以生成一種類似于主鍵的東西,,發(fā)生故障時(shí)冪等性的重試多次,,這樣就做到了Exactly once,。截止到目前(Kafka 0.8.2版本,2015-03-04),,這一Feature還并未實(shí)現(xiàn),,有希望在Kafka未來(lái)的版本中實(shí)現(xiàn)。(所以目前默認(rèn)情況下一條消息從Producer到broker是確保了At least once,,可通過(guò)設(shè)置Producer異步發(fā)送實(shí)現(xiàn)At most once),。


接下來(lái)討論的是消息從broker到Consumer的delivery guarantee語(yǔ)義。(僅針對(duì)Kafka consumer high level API),。Consumer在從broker讀取消息后,,可以選擇commit,該操作會(huì)在Zookeeper中保存該Consumer在該P(yáng)artition中讀取的消息的offset,。該Consumer下一次再讀該P(yáng)artition時(shí)會(huì)從下一條開(kāi)始讀取,。如未commit,下一次讀取的開(kāi)始位置會(huì)跟上一次commit之后的開(kāi)始位置相同,。當(dāng)然可以將Consumer設(shè)置為autocommit,,即Consumer一旦讀到數(shù)據(jù)立即自動(dòng)commit。如果只討論這一讀取消息的過(guò)程,,那Kafka是確保了Exactly once,。但實(shí)際使用中應(yīng)用程序并非在Consumer讀取完數(shù)據(jù)就結(jié)束了,而是要進(jìn)行進(jìn)一步處理,,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic,。

讀完消息先commit再處理消息。這種模式下,,如果Consumer在commit后還沒(méi)來(lái)得及處理消息就crash了,,下次重新開(kāi)始工作后就無(wú)法讀到剛剛已提交而未處理的消息,這就對(duì)應(yīng)于At most once 讀完消息先處理再commit,。這種模式下,,如果在處理完消息之后commit之前Consumer crash了,下次重新開(kāi)始工作時(shí)還會(huì)處理剛剛未commit的消息,,實(shí)際上該消息已經(jīng)被處理過(guò)了,。這就對(duì)應(yīng)于At least once。


在很多使用場(chǎng)景下,,消息都有一個(gè)主鍵,,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,,那就可以認(rèn)為是Exactly once。(筆者認(rèn)為這種說(shuō)法比較牽強(qiáng),,畢竟它不是Kafka本身提供的機(jī)制,,主鍵本身也并不能完全保證操作的冪等性,。而且實(shí)際上我們說(shuō)delivery guarantee 語(yǔ)義是討論被處理多少次,而非處理結(jié)果怎樣,,因?yàn)樘幚矸绞蕉喾N多樣,,我們不應(yīng)該把處理過(guò)程的特性——如是否冪等性,當(dāng)成Kafka本身的Feature)


如果一定要做到Exactly once,,就需要協(xié)調(diào)offset和實(shí)際操作的輸出,。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個(gè)地方,,會(huì)更簡(jiǎn)潔和通用,。這種方式可能更好,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交,。比如,,Consumer拿到數(shù)據(jù)后可能把數(shù)據(jù)放到HDFS,如果把最新的offset和數(shù)據(jù)本身一起寫(xiě)到HDFS,,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都完成,,要么都不完成,間接實(shí)現(xiàn)Exactly once,。(目前就high level API而言,,offset是存于Zookeeper中的,無(wú)法存于HDFS,,而low level API的offset是由自己去維護(hù)的,,可以將之存于HDFS中)


總之,Kafka默認(rèn)保證At least once,,并且允許通過(guò)設(shè)置Producer異步提交來(lái)實(shí)現(xiàn)At most once,。而Exactly once要求與外部存儲(chǔ)系統(tǒng)協(xié)作,幸運(yùn)的是Kafka提供的offset可以非常直接非常容易得使用這種方式,。


作者簡(jiǎn)介


郭?。↗ason),碩士,,從事大數(shù)據(jù)平臺(tái)研發(fā)工作,,精通Kafka等分布式消息系統(tǒng)及Storm等流式處理系統(tǒng)。

新浪微博: 郭俊_Jason


微信:habren


博客:http://www.


下篇預(yù)告


下一篇將深入講解Kafka是如何做Replication和Leader Election的,。在Kafka0.8以前的版本中,,如果某個(gè)broker宕機(jī),或者磁盤(pán)出現(xiàn)問(wèn)題,,則該broker上所有partition的數(shù)據(jù)都會(huì)丟失,。而Kafka0.8以后加入了Replication機(jī)制,可以將每個(gè)Partition的數(shù)據(jù)備份多份,,即使某些broker宕機(jī)也能保證系統(tǒng)的可用性和數(shù)據(jù)的完整性,。




如果想要評(píng)論本篇文章,,想看下其他讀者都有什么話想說(shuō),歡迎點(diǎn)擊“閱讀原文”參與討論,。


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多