該論文來自Berkeley實驗室,,英文標(biāo)題為:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,。下面的翻譯,我是基于科學(xué)網(wǎng)翻譯基礎(chǔ)上進(jìn)行優(yōu)化,、修改,、補充,這篇譯文翻譯得很不錯。在此基礎(chǔ)上,,我增加了來自英文原文的圖和表格數(shù)據(jù),,以及譯文中缺少的未翻譯的部分。如果翻譯措辭或邏輯有誤,,歡迎批評指正,。 摘要本文提出了分布式內(nèi)存抽象的概念——彈性分布式數(shù)據(jù)集(RDD,Resilient Distributed Datasets),,它具備像MapReduce等數(shù)據(jù)流模型的容錯特性,,并且允許開發(fā)人員在大型集群上執(zhí)行基于內(nèi)存的計算。現(xiàn)有的數(shù)據(jù)流系統(tǒng)對兩種應(yīng)用的處理并不高效:一是迭代式算法,,這在圖應(yīng)用和機器學(xué)習(xí)領(lǐng)域很常見,;二是交互式數(shù)據(jù)挖掘工具。這兩種情況下,,將數(shù)據(jù)保存在內(nèi)存中能夠極大地提高性能,。為了有效地實現(xiàn)容錯,RDD提供了一種高度受限的共享內(nèi)存,,即RDD是只讀的,,并且只能通過其他RDD上的批量操作來創(chuàng)建。盡管如此,,RDD仍然足以表示很多類型的計算,,包括MapReduce和專用的迭代編程模型(如Pregel)等。我們實現(xiàn)的RDD在迭代計算方面比Hadoop快20多倍,,同時還可以在5-7秒內(nèi)交互式地查詢1TB數(shù)據(jù)集,。 1.引言無論是工業(yè)界還是學(xué)術(shù)界,都已經(jīng)廣泛使用高級集群編程模型來處理日益增長的數(shù)據(jù),,如MapReduce和Dryad,。這些系統(tǒng)將分布式編程簡化為自動提供位置感知性調(diào)度、容錯以及負(fù)載均衡,,使得大量用戶能夠在商用集群上分析超大數(shù)據(jù)集,。 大多數(shù)現(xiàn)有的集群計算系統(tǒng)都是基于非循環(huán)的數(shù)據(jù)流模型。從穩(wěn)定的物理存儲(如分布式文件系統(tǒng))中加載記錄,,記錄被傳入由一組確定性操作構(gòu)成的DAG,,然后寫回穩(wěn)定存儲。DAG數(shù)據(jù)流圖能夠在運行時自動實現(xiàn)任務(wù)調(diào)度和故障恢復(fù),。 盡管非循環(huán)數(shù)據(jù)流是一種很強大的抽象方法,,但仍然有些應(yīng)用無法使用這種方式描述。我們就是針對這些不太適合非循環(huán)模型的應(yīng)用,,它們的特點是在多個并行操作之間重用工作數(shù)據(jù)集,。這類應(yīng)用包括:(1)機器學(xué)習(xí)和圖應(yīng)用中常用的迭代算法(每一步對數(shù)據(jù)執(zhí)行相似的函數(shù)),;(2)交互式數(shù)據(jù)挖掘工具(用戶反復(fù)查詢一個數(shù)據(jù)子集)?;跀?shù)據(jù)流的框架并不明確支持工作集,,所以需要將數(shù)據(jù)輸出到磁盤,然后在每次查詢時重新加載,,這帶來較大的開銷,。 我們提出了一種分布式的內(nèi)存抽象,稱為彈性分布式數(shù)據(jù)集(RDD,,Resilient Distributed Datasets),。它支持基于工作集的應(yīng)用,同時具有數(shù)據(jù)流模型的特點:自動容錯,、位置感知調(diào)度和可伸縮性,。RDD允許用戶在執(zhí)行多個查詢時顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,,這極大地提升了查詢速度,。 RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,,只能通過在其他RDD執(zhí)行確定的轉(zhuǎn)換操作(如map,、join和group by)而創(chuàng)建,然而這些限制使得實現(xiàn)容錯的開銷很低,。與分布式共享內(nèi)存系統(tǒng)需要付出高昂代價的檢查點和回滾機制不同,,RDD通過Lineage來重建丟失的分區(qū):一個RDD中包含了如何從其他RDD衍生所必需的相關(guān)信息,,從而不需要檢查點操作就可以重構(gòu)丟失的數(shù)據(jù)分區(qū),。盡管RDD不是一個通用的共享內(nèi)存抽象,但卻具備了良好的描述能力,、可伸縮性和可靠性,,但卻能夠廣泛適用于數(shù)據(jù)并行類應(yīng)用。 第一個指出非循環(huán)數(shù)據(jù)流存在不足的并非是我們,,例如,,Google的Pregel[21],是一種專門用于迭代式圖算法的編程模型,;Twister[13]和HaLoop[8],,是兩種典型的迭代式MapReduce模型。但是,,對于一些特定類型的應(yīng)用,,這些系統(tǒng)提供了一個受限的通信模型。相比之下,,RDD則為基于工作集的應(yīng)用提供了更為通用的抽象,,用戶可以對中間結(jié)果進(jìn)行顯式的命名和物化,,控制其分區(qū),還能執(zhí)行用戶選擇的特定操作(而不是在運行時去循環(huán)執(zhí)行一系列MapReduce步驟),。RDD可以用來描述Pregel,、迭代式MapReduce,以及這兩種模型無法描述的其他應(yīng)用,,如交互式數(shù)據(jù)挖掘工具(用戶將數(shù)據(jù)集裝入內(nèi)存,,然后執(zhí)行ad-hoc查詢)。 Spark是我們實現(xiàn)的RDD系統(tǒng),,在我們內(nèi)部能夠被用于開發(fā)多種并行應(yīng)用,。Spark采用Scala語言[5]實現(xiàn),提供類似于DryadLINQ的集成語言編程接口[34],,使用戶可以非常容易地編寫并行任務(wù),。此外,隨著Scala新版本解釋器的完善,,Spark還能夠用于交互式查詢大數(shù)據(jù)集,。我們相信Spark會是第一個能夠使用有效、通用編程語言,,并在集群上對大數(shù)據(jù)集進(jìn)行交互式分析的系統(tǒng),。 我們通過微基準(zhǔn)和用戶應(yīng)用程序來評估RDD。實驗表明,,在處理迭代式應(yīng)用上Spark比Hadoop快高達(dá)20多倍,,計算數(shù)據(jù)分析類報表的性能提高了40多倍,同時能夠在5-7秒的延時內(nèi)交互式掃描1TB數(shù)據(jù)集,。此外,,我們還在Spark之上實現(xiàn)了Pregel和HaLoop編程模型(包括其位置優(yōu)化策略),以庫的形式實現(xiàn)(分別使用了100和200行Scala代碼),。最后,,利用RDD內(nèi)在的確定性特性,我們還創(chuàng)建了一種Spark調(diào)試工具rddbg,,允許用戶在任務(wù)期間利用Lineage重建RDD,,然后像傳統(tǒng)調(diào)試器那樣重新執(zhí)行任務(wù)。 本文首先在第2部分介紹了RDD的概念,,然后第3部分描述Spark API,,第4部分解釋如何使用RDD表示幾種并行應(yīng)用(包括Pregel和HaLoop),第5部分討論Spark中RDD的表示方法以及任務(wù)調(diào)度器,,第6部分描述具體實現(xiàn)和rddbg,,第7部分對RDD進(jìn)行評估,第8部分給出了相關(guān)研究工作,,最后第9部分總結(jié),。 2.彈性分布式數(shù)據(jù)集(RDD)本部分描述RDD和編程模型,。首先討論設(shè)計目標(biāo)(2.1),然后定義RDD(2.2),,討論Spark的編程模型(2.3),,并給出一個示例(2.4),最后對比RDD與分布式共享內(nèi)存(2.5),。 2.1 目標(biāo)和概述我們的目標(biāo)是為基于工作集的應(yīng)用(即多個并行操作重用中間結(jié)果的這類應(yīng)用)提供抽象,,同時保持MapReduce及其相關(guān)模型的優(yōu)勢特性:即自動容錯、位置感知性調(diào)度和可伸縮性,。RDD比數(shù)據(jù)流模型更易于編程,,同時基于工作集的計算也具有良好的描述能力。 在這些特性中,,最難實現(xiàn)的是容錯性,。一般來說,分布式數(shù)據(jù)集的容錯性有兩種方式:即數(shù)據(jù)檢查點和記錄數(shù)據(jù)的更新,。我們面向的是大規(guī)模數(shù)據(jù)分析,,數(shù)據(jù)檢查點操作成本很高:需要通過數(shù)據(jù)中心的網(wǎng)絡(luò)連接在機器之間復(fù)制龐大的數(shù)據(jù)集,而網(wǎng)絡(luò)帶寬往往比內(nèi)存帶寬低得多,,同時還需要消耗更多的存儲資源(在內(nèi)存中復(fù)制數(shù)據(jù)可以減少需要緩存的數(shù)據(jù)量,,而存儲到磁盤則會拖慢應(yīng)用程序)。所以,,我們選擇記錄更新的方式,。但是,如果更新太多,,那么記錄更新成本也不低,。因此,RDD只支持粗粒度轉(zhuǎn)換,,即在大量記錄上執(zhí)行的單個操作,。將創(chuàng)建RDD的一系列轉(zhuǎn)換記錄下來(即Lineage),,以便恢復(fù)丟失的分區(qū),。 雖然只支持粗粒度轉(zhuǎn)換限制了編程模型,但我們發(fā)現(xiàn)RDD仍然可以很好地適用于很多應(yīng)用,,特別是支持?jǐn)?shù)據(jù)并行的批量分析應(yīng)用,,包括數(shù)據(jù)挖掘、機器學(xué)習(xí),、圖算法等,,因為這些程序通常都會在很多記錄上執(zhí)行相同的操作。RDD不太適合那些異步更新共享狀態(tài)的應(yīng)用,,例如并行web爬行器,。因此,,我們的目標(biāo)是為大多數(shù)分析型應(yīng)用提供有效的編程模型,而其他類型的應(yīng)用交給專門的系統(tǒng),。 2.2 RDD抽象RDD是只讀的,、分區(qū)記錄的集合。RDD只能基于在穩(wěn)定物理存儲中的數(shù)據(jù)集和其他已有的RDD上執(zhí)行確定性操作來創(chuàng)建,。這些確定性操作稱之為轉(zhuǎn)換,,如map、filter,、groupBy,、join(轉(zhuǎn)換不是程開發(fā)人員在RDD上執(zhí)行的操作)。 RDD不需要物化,。RDD含有如何從其他RDD衍生(即計算)出本RDD的相關(guān)信息(即Lineage),,據(jù)此可以從物理存儲的數(shù)據(jù)計算出相應(yīng)的RDD分區(qū)。 2.3 編程模型在Spark中,,RDD被表示為對象,,通過這些對象上的方法(或函數(shù))調(diào)用轉(zhuǎn)換。 定義RDD之后,,程序員就可以在動作中使用RDD了,。動作是向應(yīng)用程序返回值,或向存儲系統(tǒng)導(dǎo)出數(shù)據(jù)的那些操作,,例如,,count(返回RDD中的元素個數(shù)),collect(返回元素本身),,save(將RDD輸出到存儲系統(tǒng)),。在Spark中,只有在動作第一次使用RDD時,,才會計算RDD(即延遲計算),。這樣在構(gòu)建RDD的時候,運行時通過管道的方式傳輸多個轉(zhuǎn)換,。 程序員還可以從兩個方面控制RDD,,即緩存和分區(qū)。用戶可以請求將RDD緩存,,這樣運行時將已經(jīng)計算好的RDD分區(qū)存儲起來,,以加速后期的重用。緩存的RDD一般存儲在內(nèi)存中,,但如果內(nèi)存不夠,,可以寫到磁盤上。 另一方面,,RDD還允許用戶根據(jù)關(guān)鍵字(key)指定分區(qū)順序,,這是一個可選的功能,。目前支持哈希分區(qū)和范圍分區(qū)。例如,,應(yīng)用程序請求將兩個RDD按照同樣的哈希分區(qū)方式進(jìn)行分區(qū)(將同一機器上具有相同關(guān)鍵字的記錄放在一個分區(qū)),,以加速它們之間的join操作。在Pregel和HaLoop中,,多次迭代之間采用一致性的分區(qū)置換策略進(jìn)行優(yōu)化,,我們同樣也允許用戶指定這種優(yōu)化。 2.4 示例:控制臺日志挖掘本部分我們通過一個具體示例來闡述RDD,。假定有一個大型網(wǎng)站出錯,,操作員想要檢查Hadoop文件系統(tǒng)(HDFS)中的日志文件(TB級大小)來找出原因,。通過使用Spark,,操作員只需將日志中的錯誤信息裝載到一組節(jié)點的內(nèi)存中,然后執(zhí)行交互式查詢,。首先,,需要在Spark解釋器中輸入如下Scala命令: 第1行從HDFS文件定義了一個RDD(即一個文本行集合),第2行獲得一個過濾后的RDD,,第3行請求將errors緩存起來,。注意在Scala語法中filter的參數(shù)是一個閉包。 這時集群還沒有開始執(zhí)行任何任務(wù),。但是,,用戶已經(jīng)可以在這個RDD上執(zhí)行對應(yīng)的動作,例如統(tǒng)計錯誤消息的數(shù)目:
用戶還可以在RDD上執(zhí)行更多的轉(zhuǎn)換操作,,并使用轉(zhuǎn)換結(jié)果,,如:
使用errors的第一個action運行以后,Spark會把errors的分區(qū)緩存在內(nèi)存中,,極大地加快了后續(xù)計算速度,。注意,最初的RDD lines不會被緩存,。因為錯誤信息可能只占原數(shù)據(jù)集的很小一部分(小到足以放入內(nèi)存),。 2.5 RDD與分布式共享內(nèi)存為了進(jìn)一步理解RDD是一種分布式的內(nèi)存抽象,表1列出了RDD與分布式共享內(nèi)存(DSM,,Distributed Shared Memory)[24]的對比,。在DSM系統(tǒng)中,應(yīng)用可以向全局地址空間的任意位置進(jìn)行讀寫操作,。(注意這里的DSM,,不僅指傳統(tǒng)的共享內(nèi)存系統(tǒng),還包括那些通過分布式哈希表或分布式文件系統(tǒng)進(jìn)行數(shù)據(jù)共享的系統(tǒng),,比如Piccolo[28])DSM是一種通用的抽象,,但這種通用性同時也使得在商用集群上實現(xiàn)有效的容錯性更加困難。 RDD與DSM主要區(qū)別在于,,不僅可以通過批量轉(zhuǎn)換創(chuàng)建(即“寫”)RDD,,還可以對任意內(nèi)存位置讀寫。也就是說,,RDD限制應(yīng)用執(zhí)行批量寫操作,,這樣有利于實現(xiàn)有效的容錯。特別地,,RDD沒有檢查點開銷,,因為可以使用Lineage來恢復(fù)RDD。而且,,失效時只需要重新計算丟失的那些RDD分區(qū),,可以在不同節(jié)點上并行執(zhí)行,而不需要回滾整個程序,。
注意,,通過備份任務(wù)的拷貝,,RDD還可以處理落后任務(wù)(即運行很慢的節(jié)點),這點與MapReduce[12]類似,。而DSM則難以實現(xiàn)備份任務(wù),,因為任務(wù)及其副本都需要讀寫同一個內(nèi)存位置。 與DSM相比,RDD模型有兩個好處,。第一,,對于RDD中的批量操作,運行時將根據(jù)數(shù)據(jù)存放的位置來調(diào)度任務(wù),,從而提高性能,。第二,對于基于掃描的操作,,如果內(nèi)存不足以緩存整個RDD,,就進(jìn)行部分緩存。把內(nèi)存放不下的分區(qū)存儲到磁盤上,,此時性能與現(xiàn)有的數(shù)據(jù)流系統(tǒng)差不多,。 最后看一下讀操作的粒度。RDD上的很多動作(如count和collect)都是批量讀操作,,即掃描整個數(shù)據(jù)集,,可以將任務(wù)分配到距離數(shù)據(jù)最近的節(jié)點上。同時,,RDD也支持細(xì)粒度操作,,即在哈希或范圍分區(qū)的RDD上執(zhí)行關(guān)鍵字查找,。 3. Spark編程接口Spark用Scala[5]語言實現(xiàn)了RDD的API,。Scala是一種基于JVM的靜態(tài)類型、函數(shù)式,、面向?qū)ο蟮恼Z言,。我們選擇Scala是因為它簡潔(特別適合交互式使用)、有效(因為是靜態(tài)類型),。但是,,RDD抽象并不局限于函數(shù)式語言,也可以使用其他語言來實現(xiàn)RDD,,比如像Hadoop[2]那樣用類表示用戶函數(shù),。 要使用Spark,開發(fā)者需要編寫一個driver程序,,連接到集群以運行Worker,,如圖2所示。Driver定義了一個或多個RDD,,并調(diào)用RDD上的動作,。Worker是長時間運行的進(jìn)程,將RDD分區(qū)以Java對象的形式緩存在內(nèi)存中,。 再看看2.4中的例子,,用戶執(zhí)行RDD操作時會提供參數(shù),,比如map傳遞一個閉包(closure,,函數(shù)式編程中的概念)。Scala將閉包表示為Java對象,,如果傳遞的參數(shù)是閉包,,則這些對象被序列化,通過網(wǎng)絡(luò)傳輸?shù)狡渌?jié)點上進(jìn)行裝載,。Scala將閉包內(nèi)的變量保存為Java對象的字段,。例如,var x = 5; rdd.map(_ + x) 這段代碼將RDD中的每個元素加5,??偟膩碚f,Spark的語言集成類似于DryadLINQ,。 RDD本身是靜態(tài)類型對象,,由參數(shù)指定其元素類型。例如,,RDD[int]是一個整型RDD,。不過,我們舉的例子幾乎都省略了這個類型參數(shù),,因為Scala支持類型推斷,。 雖然在概念上使用Scala實現(xiàn)RDD很簡單,但還是要處理一些Scala閉包對象的反射問題,。如何通過Scala解釋器來使用Spark還需要更多工作,,這點我們將在第6部分討論。不管怎樣,,我們都不需要修改Scala編譯器,。 3.1 Spark中的RDD操作表2列出了Spark中的RDD轉(zhuǎn)換和動作。每個操作都給出了標(biāo)識,,其中方括號表示類型參數(shù),。前面說過轉(zhuǎn)換是延遲操作,用于定義新的RDD,;而動作啟動計算操作,,并向用戶程序返回值或向外部存儲寫數(shù)據(jù)。
注意,,有些操作只對鍵值對可用,,比如join,。另外,函數(shù)名與Scala及其他函數(shù)式語言中的API匹配,,例如map是一對一的映射,,而flatMap是將每個輸入映射為一個或多個輸出(與MapReduce中的map類似)。 除了這些操作以外,,用戶還可以請求將RDD緩存起來,。而且,用戶還可以通過Partitioner類獲取RDD的分區(qū)順序,,然后將另一個RDD按照同樣的方式分區(qū),。有些操作會自動產(chǎn)生一個哈希或范圍分區(qū)的RDD,,像groupByKey,,reduceByKey和sort等。 4. 應(yīng)用程序示例現(xiàn)在我們講述如何使用RDD表示幾種基于數(shù)據(jù)并行的應(yīng)用,。首先討論一些迭代式機器學(xué)習(xí)應(yīng)用(4.1),,然后看看如何使用RDD描述幾種已有的集群編程模型,即MapReduce(4.2),,Pregel(4.3),,和Hadoop(4.4),。最后討論一下RDD不適合哪些應(yīng)用(4.5),。 4.1 迭代式機器學(xué)習(xí)很多機器學(xué)習(xí)算法都具有迭代特性,,運行迭代優(yōu)化方法來優(yōu)化某個目標(biāo)函數(shù),,例如梯度下降方法,。如果這些算法的工作集能夠放入內(nèi)存,將極大地加速程序運行,。而且,,這些算法通常采用批量操作,,例如映射和求和,這樣更容易使用RDD來表示,。 例如下面的程序是邏輯回歸[15]的實現(xiàn),。邏輯回歸是一種常見的分類算法,,即尋找一個最佳分割兩組點(即垃圾郵件和非垃圾郵件)的超平面w。算法采用梯度下降的方法:開始時w為隨機值,,在每一次迭代的過程中,,對w的函數(shù)求和,,然后朝著優(yōu)化的方向移動w,。
首先定義一個名為points的緩存RDD,這是在文本文件上執(zhí)行map轉(zhuǎn)換之后得到的,,即將每個文本行解析為一個Point對象,。然后在points上反復(fù)執(zhí)行map和reduce操作,,每次迭代時通過對當(dāng)前w的函數(shù)進(jìn)行求和來計算梯度。7.1小節(jié)我們將看到這種在內(nèi)存中緩存points的方式,,比每次迭代都從磁盤文件裝載數(shù)據(jù)并進(jìn)行解析要快得多,。 已經(jīng)在Spark中實現(xiàn)的迭代式機器學(xué)習(xí)算法還有:kmeans(像邏輯回歸一樣每次迭代時執(zhí)行一對map和reduce操作),期望最大化算法(EM,,兩個不同的map/reduce步驟交替執(zhí)行),,交替最小二乘矩陣分解和協(xié)同過濾算法。Chu等人提出迭代式MapReduce也可以用來實現(xiàn)常用的學(xué)習(xí)算法[11],。 4.2 使用RDD實現(xiàn)MapReduceMapReduce模型[12]很容易使用RDD進(jìn)行描述,。假設(shè)有一個輸入數(shù)據(jù)集(其元素類型為T),和兩個函數(shù)myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],,代碼如下:
如果任務(wù)包含combiner,,則相應(yīng)的代碼為:
ReduceByKey操作在mapper節(jié)點上執(zhí)行部分聚集,與MapReduce的combiner類似,。 4.3 使用RDD實現(xiàn)PregelPregel[21]是面向圖算法的基于BSP范式[32]的編程模型,。程序由一系列超步(Superstep)協(xié)調(diào)迭代運行。在每個超步中,,各個頂點執(zhí)行用戶函數(shù),,并更新相應(yīng)的頂點狀態(tài),,變異圖拓?fù)洌缓笙蛳乱粋€超步的頂點集發(fā)送消息,。這種模型能夠描述很多圖算法,,包括最短路徑,雙邊匹配和PageRank等,。 以PageRank為例介紹一下Pregel的實現(xiàn),。當(dāng)前PageRank[7]記為r,頂點表示狀態(tài),。在每個超步中,,各個頂點向其所有鄰居發(fā)送貢獻(xiàn)值r/n,這里n是鄰居的數(shù)目,。下一個超步開始時,,每個頂點將其分值(rank)更新為 α/N + (1 - α) * Σci,這里的求和是各個頂點收到的所有貢獻(xiàn)值的和,,N是頂點的總數(shù),。 Pregel將輸入的圖劃分到各個worker上,并存儲在其內(nèi)存中,。在每個超步中,,各個worker通過一種類似MapReduce的Shuffle操作交換消息。 Pregel的通信模式可以用RDD來描述,,如圖3,。主要思想是:將每個超步中的頂點狀態(tài)和要發(fā)送的消息存儲為RDD,然后根據(jù)頂點ID分組,,進(jìn)行Shuffle通信(即cogroup操作),。然后對每個頂點ID上的狀態(tài)和消息應(yīng)用用戶函數(shù)(即mapValues操作),產(chǎn)生一個新的RDD,,即(VertexID, (NewState, OutgoingMessages)),。然后執(zhí)行map操作分離出下一次迭代的頂點狀態(tài)和消息(即mapValues和flatMap操作)。代碼如下:
完整的Pregel編程模型還包括其他工具,,比如combiner,,附錄A討論了它們的實現(xiàn)。下面將討論Pregel的容錯性,,以及如何在實現(xiàn)相同容錯性的同時減少需要執(zhí)行檢查點操作的數(shù)據(jù)量,。 我們差不多用了100行Scala代碼在Spark上實現(xiàn)了一個類Pregel的API。7.2小節(jié)將使用PageRank算法評估它的性能,。 4.3.1 Pregel容錯當(dāng)前,,Pregel基于檢查點機制來為頂點狀態(tài)及其消息實現(xiàn)容錯[21]。然而作者是這樣描述的:通過在其它的節(jié)點上記錄已發(fā)消息日志,,然后單獨重建丟失的分區(qū),,只需要恢復(fù)局部數(shù)據(jù)即可。上面提到這兩種方式,,RDD都能夠很好地支持,。 通過4.3小節(jié)的實現(xiàn),Spark總是能夠基于Lineage實現(xiàn)頂點和消息RDD的重建,,但是由于過長的Lineage鏈,恢復(fù)可能會付出高昂的代價,。因為迭代RDD依賴于上一個RDD,,對于部分分區(qū)來說,節(jié)點故障可能會導(dǎo)致這些分區(qū)狀態(tài)的所有迭代版本丟失,,這就要求使用一種“級聯(lián)-重新執(zhí)行”[20]的方式去依次重建每一個丟失的分區(qū),。為了避免這個問題,用戶可以周期性地在頂點和消息RDD上執(zhí)行save操作,,將狀態(tài)信息保存到持久存儲中,。然后,Spark能夠在失敗的時候自動地重新計算這些丟失的分區(qū)(而不是回滾整個程序),。 最后,,我們意識到,RDD也能夠?qū)崿F(xiàn)檢查點數(shù)據(jù)的reduce操作,,這要求通過一種高效的檢查點方案來表達(dá)檢查點數(shù)據(jù),。在很多Pregel作業(yè)中,頂點狀態(tài)都包括可變與不可變的組件,,例如,,在PageRank中,與一個頂點相鄰的頂點列表是不可變的,,但是它們的排名是可變的,,在這種情況下,,我們可以使用一個來自可變數(shù)據(jù)的單獨RDD來替換不可變RDD,基于這樣一個較短的Lineage鏈,,檢查點僅僅是可變狀態(tài),,圖4解釋了這種方式。 4.4 使用RDD實現(xiàn)HaLoopHaLoop[8]是Hadoop的一個擴(kuò)展版本,,它能夠改善具有迭代特性的MapReduce程序的性能?;贖aLoop編程模型的應(yīng)用,,使用reduce階段的輸出作為map階段下一輪迭代的輸入。它的循環(huán)感知任務(wù)調(diào)度器能夠保證,,在每一輪迭代中處理同一個分區(qū)數(shù)據(jù)的連續(xù)map和reduce任務(wù),,一定能夠在同一臺物理機上執(zhí)行。確保迭代間locality特性,,reduce數(shù)據(jù)在物理節(jié)點之間傳輸,,并且允許數(shù)據(jù)緩存在本地磁盤而能夠被后續(xù)迭代重用。 使用RDD來優(yōu)化HaLoop,,我們在Spark上實現(xiàn)了一個類似HaLoop的API,,這個庫只使用了200行Scala代碼。通過partitionBy能夠保證跨迭代的分區(qū)的一致性,,每一個階段的輸入和輸出被緩存以用于后續(xù)迭代,。 4.5 不適合使用RDD的應(yīng)用在2.1節(jié)我們討論過,RDD適用于具有批量轉(zhuǎn)換需求的應(yīng)用,,并且相同的操作作用于數(shù)據(jù)集的每一個元素上,。在這種情況下,RDD能夠記住每個轉(zhuǎn)換操作,,對應(yīng)于Lineage圖中的一個步驟,,恢復(fù)丟失分區(qū)數(shù)據(jù)時不需要寫日志記錄大量數(shù)據(jù)。RDD不適合那些通過異步細(xì)粒度地更新來共享狀態(tài)的應(yīng)用,,例如Web應(yīng)用中的存儲系統(tǒng),,或者增量抓取和索引Web數(shù)據(jù)的系統(tǒng),這樣的應(yīng)用更適合使用一些傳統(tǒng)的方法,例如數(shù)據(jù)庫,、RAMCloud[26],、Percolator[27]和Piccolo[28]。我們的目標(biāo)是,,面向批量分析應(yīng)用的這類特定系統(tǒng),,提供一種高效的編程模型,而不是一些異步應(yīng)用程序,。 5. RDD的描述及作業(yè)調(diào)度我們希望在不修改調(diào)度器的前提下,,支持RDD上的各種轉(zhuǎn)換操作,同時能夠從這些轉(zhuǎn)換獲取Lineage信息,。為此,,我們?yōu)镽DD設(shè)計了一組小型通用的內(nèi)部接口。 簡單地說,,每個RDD都包含:(1)一組RDD分區(qū)(partition,,即數(shù)據(jù)集的原子組成部分);(2)對父RDD的一組依賴,,這些依賴描述了RDD的Lineage,;(3)一個函數(shù),即在父RDD上執(zhí)行何種計算,;(4)元數(shù)據(jù),,描述分區(qū)模式和數(shù)據(jù)存放的位置。例如,,一個表示HDFS文件的RDD包含:各個數(shù)據(jù)塊的一個分區(qū),,并知道各個數(shù)據(jù)塊放在哪些節(jié)點上。而且這個RDD上的map操作結(jié)果也具有同樣的分區(qū),,map函數(shù)是在父數(shù)據(jù)上執(zhí)行的。表3總結(jié)了RDD的內(nèi)部接口,。
設(shè)計接口的一個關(guān)鍵問題就是,,如何表示RDD之間的依賴,。我們發(fā)現(xiàn)RDD之間的依賴關(guān)系可以分為兩類,即:(1)窄依賴(narrow dependencies):子RDD的每個分區(qū)依賴于常數(shù)個父分區(qū)(即與數(shù)據(jù)規(guī)模無關(guān)),;(2)寬依賴(wide dependencies):子RDD的每個分區(qū)依賴于所有父RDD分區(qū),。例如,map產(chǎn)生窄依賴,,而join則是寬依賴(除非父RDD被哈希分區(qū)),。另一個例子見圖5。 通過RDD接口,Spark只需要不超過20行代碼實現(xiàn)便可以實現(xiàn)大多數(shù)轉(zhuǎn)換,。5.1小節(jié)給出了例子,,然后我們討論了怎樣使用RDD接口進(jìn)行調(diào)度(5.2),最后討論一下基于RDD的程序何時需要數(shù)據(jù)檢查點操作(5.3),。 5.1 RDD實現(xiàn)舉例HDFS文件:目前為止我們給的例子中輸入RDD都是HDFS文件,,對這些RDD可以執(zhí)行:partitions操作返回各個數(shù)據(jù)塊的一個分區(qū)(每個Partition對象中保存數(shù)據(jù)塊的偏移),preferredLocations操作返回數(shù)據(jù)塊所在的節(jié)點列表,,iterator操作對數(shù)據(jù)塊進(jìn)行讀取,。 map:任何RDD上都可以執(zhí)行map操作,返回一個MappedRDD對象,。該操作傳遞一個函數(shù)參數(shù)給map,,對父RDD上的記錄按照iterator的方式執(zhí)行這個函數(shù),并返回一組符合條件的父RDD分區(qū)及其位置,。 union:在兩個RDD上執(zhí)行union操作,,返回兩個父RDD分區(qū)的并集。通過相應(yīng)父RDD上的窄依賴關(guān)系計算每個子RDD分區(qū)(注意union操作不會過濾重復(fù)值,,相當(dāng)于SQL中的UNION ALL),。 sample:抽樣與映射類似,但是sample操作中,,RDD需要存儲一個隨機數(shù)產(chǎn)生器的種子,,這樣每個分區(qū)能夠確定哪些父RDD記錄被抽樣。 join:對兩個RDD執(zhí)行join操作可能產(chǎn)生窄依賴(如果這兩個RDD擁有相同的哈希分區(qū)或范圍分區(qū)),可能是寬依賴,,也可能兩種依賴都有(比如一個父RDD有分區(qū),,而另一父RDD沒有)。 5.2 Spark任務(wù)調(diào)度器調(diào)度器根據(jù)RDD的結(jié)構(gòu)信息為每個動作確定有效的執(zhí)行計劃,。調(diào)度器的接口是runJob函數(shù),,參數(shù)為RDD及其分區(qū)集,和一個RDD分區(qū)上的函數(shù),。該接口足以表示Spark中的所有動作(即count,、collect、save等),。 總的來說,,我們的調(diào)度器跟Dryad類似,但我們還考慮了哪些RDD分區(qū)是緩存在內(nèi)存中的,。調(diào)度器根據(jù)目標(biāo)RDD的Lineage圖創(chuàng)建一個由stage構(gòu)成的無回路有向圖(DAG),。每個stage內(nèi)部盡可能多地包含一組具有窄依賴關(guān)系的轉(zhuǎn)換,并將它們流水線并行化(pipeline),。stage的邊界有兩種情況:一是寬依賴上的Shuffle操作,;二是已緩存分區(qū),它可以縮短父RDD的計算過程,。例如圖6,。父RDD完成計算后,可以在stage內(nèi)啟動一組任務(wù)計算丟失的分區(qū),。 調(diào)度器根據(jù)數(shù)據(jù)存放的位置分配任務(wù),以最小化通信開銷,。如果某個任務(wù)需要處理一個已緩存分區(qū),,則直接將任務(wù)分配給擁有這個分區(qū)的節(jié)點。否則,,如果需要處理的分區(qū)位于多個可能的位置(例如,,由HDFS的數(shù)據(jù)存放位置決定),則將任務(wù)分配給這一組節(jié)點。 對于寬依賴(例如需要Shuffle的依賴),,目前的實現(xiàn)方式是,,在擁有父分區(qū)的節(jié)點上將中間結(jié)果物化,簡化容錯處理,,這跟MapReduce中物化map輸出很像,。 如果某個任務(wù)失效,只要stage中的父RDD分區(qū)可用,,則只需在另一個節(jié)點上重新運行這個任務(wù)即可,。如果某些stage不可用(例如,Shuffle時某個map輸出丟失),,則需要重新提交這個stage中的所有任務(wù)來計算丟失的分區(qū),。 最后,lookup動作允許用戶從一個哈?;蚍秶謪^(qū)的RDD上,,根據(jù)關(guān)鍵字讀取一個數(shù)據(jù)元素。這里有一個設(shè)計問題,。Driver程序調(diào)用lookup時,,只需要使用當(dāng)前調(diào)度器接口計算關(guān)鍵字所在的那個分區(qū)。當(dāng)然任務(wù)也可以在集群上調(diào)用lookup,,這時可以將RDD視為一個大的分布式哈希表,。這種情況下,任務(wù)和被查詢的RDD之間的并沒有明確的依賴關(guān)系(因為worker執(zhí)行的是lookup),,如果所有節(jié)點上都沒有相應(yīng)的緩存分區(qū),,那么任務(wù)需要告訴調(diào)度器計算哪些RDD來完成查找操作。 5.3 檢查點盡管RDD中的Lineage信息可以用來故障恢復(fù),,但對于那些Lineage鏈較長的RDD來說,,這種恢復(fù)可能很耗時。例如4.3小節(jié)中的Pregel任務(wù),,每次迭代的頂點狀態(tài)和消息都跟前一次迭代有關(guān),,所以Lineage鏈很長。如果將Lineage鏈存到物理存儲中,,再定期對RDD執(zhí)行檢查點操作就很有效,。 一般來說,Lineage鏈較長,、寬依賴的RDD需要采用檢查點機制,。這種情況下,集群的節(jié)點故障可能導(dǎo)致每個父RDD的數(shù)據(jù)塊丟失,,因此需要全部重新計算[20],。將窄依賴的RDD數(shù)據(jù)存到物理存儲中可以實現(xiàn)優(yōu)化,,例如前面4.1小節(jié)邏輯回歸的例子,將數(shù)據(jù)點和不變的頂點狀態(tài)存儲起來,,就不再需要檢查點操作,。 當(dāng)前Spark版本提供檢查點API,但由用戶決定是否需要執(zhí)行檢查點操作,。今后我們將實現(xiàn)自動檢查點,,根據(jù)成本效益分析確定RDD Lineage圖中的最佳檢查點位置。 值得注意的是,,因為RDD是只讀的,,所以不需要任何一致性維護(hù)(例如寫復(fù)制策略,分布式快照或者程序暫停等)帶來的開銷,,后臺執(zhí)行檢查點操作,。 我們使用10000行Scala代碼實現(xiàn)了Spark。系統(tǒng)可以使用任何Hadoop數(shù)據(jù)源(如HDFS,,Hbase)作為輸入,,這樣很容易與Hadoop環(huán)境集成。Spark以庫的形式實現(xiàn),,不需要修改Scala編譯器,。 這里討論關(guān)于實現(xiàn)的三方面問題:(1)修改Scala解釋器,允許交互模式使用Spark(6.1),;(2)緩存管理(6.2),;(3)調(diào)試工具rddbg(6.3)。 6. 實現(xiàn)6.1 解釋器的集成像Ruby和Python一樣,,Scala也有一個交互式shell,。基于內(nèi)存的數(shù)據(jù)可以實現(xiàn)低延時,,我們希望允許用戶從解釋器交互式地運行Spark,,從而在大數(shù)據(jù)集上實現(xiàn)大規(guī)模并行數(shù)據(jù)挖掘。 Scala解釋器通常根據(jù)將用戶輸入的代碼行,,來對類進(jìn)行編譯,,接著裝載到JVM中,然后調(diào)用類的函數(shù),。這個類是一個包含輸入行變量或函數(shù)的單例對象,,并在一個初始化函數(shù)中運行這行代碼。例如,,如果用戶輸入代碼var x = 5,,接著又輸入println(x),則解釋器會定義一個包含x的Line1類,,并將第2行編譯為println(Line1.getInstance().x),。 在Spark中我們對解釋器做了兩點改動:
6.2 緩存管理Worker節(jié)點將RDD分區(qū)以Java對象的形式緩存在內(nèi)存中,。由于大部分操作是基于掃描的,,采取RDD級的LRU(最近最少使用)替換策略(即不會為了裝載一個RDD分區(qū)而將同一RDD的其他分區(qū)替換出去)。目前這種簡單的策略適合大多數(shù)用戶應(yīng)用,。另外,,使用帶參數(shù)的cache操作可以設(shè)定RDD的緩存優(yōu)先級。 6.3 rddbg:RDD程序的調(diào)試工具RDD的初衷是為了實現(xiàn)容錯以能夠再計算(re-computation),,這個特性使得調(diào)試更容易,。我們創(chuàng)建了一個名為rddbg的調(diào)試工具,它是通過基于程序記錄的Lineage信息來實現(xiàn)的,,允許用戶:(1)重建任何由程序創(chuàng)建的RDD,,并執(zhí)行交互式查詢;(2)使用一個單進(jìn)程Java調(diào)試器(如jdb)傳入計算好的RDD分區(qū),,能夠重新運行作業(yè)中的任何任務(wù),。 我們強調(diào)一下,rddbg不是一個完全重放的調(diào)試器:特別是不對非確定性的代碼或動作進(jìn)行重放,。但如果某個任務(wù)一直運行很慢(比如由于數(shù)據(jù)分布不均勻或者異常輸入等原因),,仍然可以用它來幫助找到其中的邏輯錯誤和性能問題。 舉個例子,,我們使用rddbg去解決用戶Spam分類作業(yè)中的一個bug,,這個作業(yè)中的每次迭代都產(chǎn)生0值。在調(diào)試器中重新執(zhí)行reduce任務(wù),,很快就能發(fā)現(xiàn),,輸入的權(quán)重向量(存儲在一個用戶自定義的向量類中)竟然是空值,。由于從一個未初始化的稀疏向量中讀取總是返回0,運行時也不會拋出異常,。在這個向量類中設(shè)置一個斷點,,然后運行這個任務(wù),引導(dǎo)程序很快就運行到設(shè)置的斷點處,,我們發(fā)現(xiàn)向量類的一個數(shù)組字段的值為空,,我們診斷出了這個bug:稀疏向量類中的數(shù)據(jù)字段被錯誤地使用transient來修飾,導(dǎo)致序列化時忽略了該字段的數(shù)據(jù),。 rddbg給程序執(zhí)行帶來的開銷很小,。程序本來就需要將各個RDD中的所有閉包序列化并通過網(wǎng)絡(luò)傳送,只不過使用rddbg同時還要將這些閉集記錄到磁盤,。 7. 評估我們在Amazon EC2[1]上進(jìn)行了一系列實驗來評估Spark及RDD的性能,,并與Hadoop及其他應(yīng)用程序的基準(zhǔn)進(jìn)行了對比??偟恼f來,,結(jié)果如下: 7.1 可迭代的機器學(xué)習(xí)應(yīng)用我們實現(xiàn)了2個迭代式機器學(xué)習(xí)(ML)應(yīng)用,,Logistic回歸和K-means算法,,與如下系統(tǒng)進(jìn)行性能對比:
我們使用同一數(shù)據(jù)集在相同條件下運行Logistic回歸和K-means算法:使用400個任務(wù)(每個任務(wù)處理的輸入數(shù)據(jù)塊大小為256M),,在25-100臺機器,執(zhí)行10次迭代處理100G輸入數(shù)據(jù)集(表4),。兩個作業(yè)的關(guān)鍵區(qū)別在于每輪迭代單個字節(jié)的計算量不同,。K-means的迭代時間取決于更新聚類坐標(biāo)耗時,Logistic回歸是非計算密集型的,,但是在序列化和解析過程中非常耗時,。
首輪迭代,。在首輪迭代過程中,三個系統(tǒng)都是從HDFS中讀取文本數(shù)據(jù)作為輸入,。圖9中“First Iteration”顯示了首輪迭代的柱狀圖,,實驗中Spark快于Hadoop,主要是因為Hadoop中的各個分布式組件基于心跳協(xié)議來發(fā)送信號帶來了開銷,。HadoopBinMem是最慢的,,因為它通過一個額外的MapReduce作業(yè)將數(shù)據(jù)轉(zhuǎn)換成二進(jìn)制格式。 后續(xù)迭代。圖9顯示了后續(xù)迭代的平均耗時,,圖8對比了不同聚類大小條件下耗時情況,,我們發(fā)現(xiàn)在100個節(jié)點上運行Logistic回歸程序,Spark比Hadoop,、HadoopBinMem分別快25.3,、20.7倍。從圖8(b)可以看到,,Spark僅僅比Hadoop,、HadoopBinMem分別快1.9,、3.2倍,這是因為K-means程序的開銷取決于計算(用更多的節(jié)點有助于提高計算速度的倍數(shù)),。 后續(xù)迭代中,,Hadoop仍然從HDFS讀取文本數(shù)據(jù)作為輸入,所以從首輪迭代開始Hadoop的迭代時間并沒有明顯的改善,。使用預(yù)先轉(zhuǎn)換的SequenceFile文件(Hadoop內(nèi)建的二進(jìn)制文件格式),,HadoopBinMem在后續(xù)迭代中節(jié)省了解析的代價,但是仍然帶來的其他的開銷,,如從HDFS讀SequenceFile文件并轉(zhuǎn)換成Java對象,。因為Spark直接讀取緩存于RDD中的Java對象,隨著聚類尺寸的線性增長,,迭代時間大幅下降,。
為了估測1,,我們運行空的Hadoop作業(yè),,僅僅執(zhí)行作業(yè)的初始化、啟動任務(wù),、清理工作就至少耗時25秒,。對于2,我們發(fā)現(xiàn)為了服務(wù)每一個HDFS數(shù)據(jù)塊,,HDFS進(jìn)行了多次復(fù)制以及計算校驗和操作,。 為了估測3,我們在單個節(jié)點上運行了微基準(zhǔn)程序,,在輸入的256M數(shù)據(jù)上計算Logistic回歸,,結(jié)果如表5所示。首先,,在內(nèi)存中的HDFS文件和本地文件的不同導(dǎo)致通過HDFS接口讀取耗時2秒,,甚至數(shù)據(jù)就在本地內(nèi)存中。其次,,文本和二進(jìn)制格式輸入的不同造成了解析耗時7秒的開銷,。最后,預(yù)解析的二進(jìn)制文件轉(zhuǎn)換為內(nèi)存中的Java對象,,耗時3秒,。每個節(jié)點處理多個塊時這些開銷都會累積起來,然而通過緩存RDD作為內(nèi)存中的Java對象,Spark只需要耗時3秒,。
7.2 PageRank通過使用存儲在HDFS上的49G Wikipedia導(dǎo)出數(shù)據(jù),,我們比較了使用RDD實現(xiàn)的Pregel與使用Hadoop計算PageRank的性能。PageRank算法通過10輪迭代處理了大約400萬文章的鏈接圖數(shù)據(jù),,圖10顯示了在30個節(jié)點上,,Spark處理速度是Hadoop的2倍多,改進(jìn)后對輸入進(jìn)行Hash分區(qū)速度提升到2.6倍,,使用Combiner后提升到3.6倍,,這些結(jié)果數(shù)據(jù)也隨著節(jié)點擴(kuò)展到60個時同步放大。 7.3 容錯恢復(fù)基于K-means算法應(yīng)用程序,,我們評估了在單點故障(SPOF)時使用Lneage信息創(chuàng)建RDD分區(qū)的開銷,。圖11顯示了,K-means應(yīng)用程序運行在75個節(jié)點的集群中進(jìn)行了10輪迭代,,我們在正常操作和進(jìn)行第6輪迭代開始時一個節(jié)點發(fā)生故障的情況下對耗時進(jìn)行了對比。沒有任何失敗,,每輪迭代啟動了400個任務(wù)處理100G數(shù)據(jù),。 7.4 內(nèi)存不足時表現(xiàn)到現(xiàn)在為止,,我們能保證集群中的每個節(jié)點都有足夠的內(nèi)存去緩存迭代過程中使用的RDD,如果沒有足夠的內(nèi)存來緩存一個作業(yè)的工作集,,Spark又是如何運行的呢,?在實驗中,我們通過在每個節(jié)點上限制緩存RDD所需要的內(nèi)存資源來配置Spark,,在不同的緩存配置條件下執(zhí)行Logistic回歸,,結(jié)果如圖12。我們可以看出,,隨著緩存的減小,,性能平緩地下降。 7.5 基于Spark構(gòu)建的用戶應(yīng)用程序In-Memory分析,。視頻分發(fā)公司Conviva使用Spark極大地提升了為客戶處理分析報告的速度,,以前基于Hadoop使用大約20個Hive[3]查詢來完成,這些查詢作用在相同的數(shù)據(jù)子集上(滿足用戶提供的條件),但是在不同分組的字段上執(zhí)行聚合操作(SUM,、AVG,、COUNT DISTINCT等)需要使用單獨的MapReduce作業(yè)。該公司使用Spark只需要將相關(guān)數(shù)據(jù)加載到內(nèi)存中一次,,然后運行上述聚合操作,,在Hadoop集群上處理200G壓縮數(shù)據(jù)并生成報耗時20小時,而使用Spark基于96G內(nèi)存的2個節(jié)點耗時30分鐘即可完成,,速度提升40倍,,主要是因為不需要再對每個作業(yè)重復(fù)地執(zhí)行解壓縮和過濾操作。 城市交通建模,。在Berkeley的Mobile Millennium項目[17]中,,基于一系列分散的汽車GPS監(jiān)測數(shù)據(jù),研究人員使用并行化機器學(xué)習(xí)算法來推算公路交通擁堵狀況,。數(shù)據(jù)來自市區(qū)10000個互聯(lián)的公路線路網(wǎng),,還有600000個由汽車GPS裝置采集到的樣本數(shù)據(jù),這些數(shù)據(jù)記錄了汽車在兩個地點之間行駛的時間(每一條路線的行駛時間可能跨多個公路線路網(wǎng)),。使用一個交通模型,,通過推算跨多個公路網(wǎng)行駛耗時預(yù)期,系統(tǒng)能夠估算擁堵狀況,。研究人員使用Spark實現(xiàn)了一個可迭代的EM算法,,其中包括向Worker節(jié)點廣播路線網(wǎng)絡(luò)信息,在E和M階段之間執(zhí)行reduceByKey操作,,應(yīng)用從20個節(jié)點擴(kuò)展到80個節(jié)點(每個節(jié)點4核),,如圖13(a)所示: 7.6 交互式數(shù)據(jù)挖掘為了展示Spark交互式處理大數(shù)據(jù)集的能力,我們在100個m2.4xlarge EC2實例(8核68G內(nèi)存)上使用Spark分析1TB從2008-10到2009-4這段時間的Wikipedia頁面瀏覽日志數(shù)據(jù),,在整個輸入數(shù)據(jù)集上簡單地查詢?nèi)缦聝?nèi)容以獲取頁面瀏覽總數(shù):(1)全部頁面,;(2)頁面的標(biāo)題能精確匹配給定的關(guān)鍵詞;(3)頁面的標(biāo)題能部分匹配給定的關(guān)鍵詞。 8. 相關(guān)工作分布式共享內(nèi)存(DSM)。RDD可以看成是一個基于DSM研究[24]得到的抽象,。在2.5節(jié)我們討論過,,RDD提供了一個比DSM限制更嚴(yán)格的編程模型,并能在節(jié)點失效時高效地重建數(shù)據(jù)集,。DSM通過檢查點[19]實現(xiàn)容錯,,而Spark使用Lineage重建RDD分區(qū),這些分區(qū)可以在不同的節(jié)點上重新并行處理,,而不需要將整個程序回退到檢查點再重新運行,。RDD能夠像MapReduce一樣將計算推向數(shù)據(jù)[12],并通過推測執(zhí)行來解決某些任務(wù)計算進(jìn)度落后的問題,,推測執(zhí)行在一般的DSM系統(tǒng)上是很難實現(xiàn)的。 In-Memory集群計算,。Piccolo[28]是一個基于可變的,、In-Memory的分布式表的集群編程模型。因為Piccolo允許讀寫表中的記錄,,它具有與DSM類似的恢復(fù)機制,,需要檢查點和回滾,但是不能推測執(zhí)行,,也沒有提供類似groupBy,、sort等更高級別的數(shù)據(jù)流算子,用戶只能直接讀取表單元數(shù)據(jù)來實現(xiàn),??梢姡琍iccolo是比Spark更低級別的編程模型,,但是比DSM要高級,。 RAMClouds[26]適合作為Web應(yīng)用的存儲系統(tǒng),它同樣提供了細(xì)粒度讀寫操作,,所以需要通過記錄日志來實現(xiàn)容錯,。 數(shù)據(jù)流系統(tǒng)。RDD借鑒了DryadLINQ[34]、Pig[25]和FlumeJava[9]的“并行收集”編程模型,,通過允許用戶顯式地將未序列化的對象保存在內(nèi)存中,,以此來控制分區(qū)和基于key隨機查找,從而有效地支持基于工作集的應(yīng)用,。RDD保留了那些數(shù)據(jù)流系統(tǒng)更高級別的編程特性,,這對那些開發(fā)人員來說也比較熟悉,而且,,RDD也能夠支持更多類型的應(yīng)用,。RDD新增的擴(kuò)展,從概念上看很簡單,,其中Spark是第一個使用了這些特性的系統(tǒng),,類似DryadLINQ編程模型,能夠有效地支持基于工作集的應(yīng)用,。 面向基于工作集的應(yīng)用,,已經(jīng)開發(fā)了一些專用系統(tǒng),像Twister[13],、HaLoop[8]實現(xiàn)了一個支持迭代的MapReduce模型,;Pregel[21],支持圖應(yīng)用的BSP計算模型,。RDD是一個更通用的抽象,,它能夠描述支持迭代的MapReduce、Pregel,,還有現(xiàn)有一些系統(tǒng)未能處理的應(yīng)用,,如交互式數(shù)據(jù)挖掘。特別地,,它能夠讓開發(fā)人員動態(tài)地選擇操作來運行在RDD上(如查看查詢的結(jié)果以決定下一步運行哪個查詢),,而不是提供一系列固定的步驟去執(zhí)行迭代,RDD還支持更多類型的轉(zhuǎn)換,。 最后,,Dremel[22]是一個低延遲查詢引擎,它面向基于磁盤存儲的大數(shù)據(jù)集,,這類數(shù)據(jù)集是把嵌套記錄數(shù)據(jù)生成基于列的格式,。這種格式的數(shù)據(jù)也能夠保存為RDD并在Spark系統(tǒng)中使用,但Spark也具備將數(shù)據(jù)加載到內(nèi)存來實現(xiàn)快速查詢的能力,。 Lineage,。我們通過參考[6]到[10]做過調(diào)研,在科學(xué)計算和數(shù)據(jù)庫領(lǐng)域,,對于一些應(yīng)用,,如需要解釋結(jié)果以及允許被重新生成,、工作流中發(fā)現(xiàn)了bug或者數(shù)據(jù)集丟失需要重新處理數(shù)據(jù),表示數(shù)據(jù)的Lineage和原始信息一直以來都是一個研究課題,。RDD提供了一個受限的編程模型,,在這個模型中使用細(xì)粒度的Lineage來表示是非常容易的,因此它可以被用于容錯,。 緩存系統(tǒng),。Nectar[14]能夠通過識別帶有程序分析的子表達(dá)式,跨DryadLINQ作業(yè)重用中間結(jié)果,,如果將這種能力加入到基于RDD的系統(tǒng)會非常有趣,。但是Nectar并沒有提供In-Memory緩存,也不能夠讓用戶顯式地控制應(yīng)該緩存那個數(shù)據(jù)集,,以及如何對其進(jìn)行分區(qū),。Ciel[23]同樣能夠記住任務(wù)結(jié)果,但不能提供In-Memory緩存并顯式控制它,。 語言迭代,。DryadLINQ[34]能夠使用LINQ獲取到表達(dá)式樹然后在集群上運行,Spark系統(tǒng)的語言集成與它很類似,。不像DryadLINQ,,Spark允許用戶顯式地跨查詢將RDD存儲到內(nèi)存中,并通過控制分區(qū)來優(yōu)化通信,。Spark支持交互式處理,,但DryadLINQ卻不支持。 關(guān)系數(shù)據(jù)庫,。從概念上看,,RDD類似于數(shù)據(jù)庫中的視圖,緩存RDD類似于物化視圖[29],。然而,數(shù)據(jù)庫像DSM系統(tǒng)一樣,,允許典型地讀寫所有記錄,,通過記錄操作和數(shù)據(jù)的日志來實現(xiàn)容錯,還需要花費額外的開銷來維護(hù)一致性,。RDD編程模型通過增加更多限制來避免這些開銷,。 9. 總結(jié)我們提出的RDD是一個面向,運行在普通商用機集群之上并行數(shù)據(jù)處理應(yīng)用的分布式內(nèi)存抽象,。RDD廣泛支持基于工作集的應(yīng)用,,包括迭代式機器學(xué)習(xí)和圖算法,還有交互式數(shù)據(jù)挖掘,,然而它保留了數(shù)據(jù)流模型中引人注目的特點,,如自動容錯恢復(fù),,處理執(zhí)行進(jìn)度落后的任務(wù),以及感知調(diào)度,。它是通過限制編程模型,,進(jìn)而允許高效地重建RDD分區(qū)來實現(xiàn)的。RDD實現(xiàn)處理迭代式作業(yè)的速度超過Hadoop大約20倍,,而且還能夠交互式查詢數(shù)百G數(shù)據(jù),。 致謝首先感謝Spark用戶,包括Timothy Hunter,、Lester Mackey,、Dilip Joseph、Jibin Zhan和Teodor Moldovan,,他們在真實的應(yīng)用中使用Spark,,提出了寶貴的建議,同時也發(fā)現(xiàn)了一些新的研究挑戰(zhàn),。這次研究離不開以下組織或團(tuán)體的大力支持:Berkeley AMP Lab創(chuàng)立贊助者Google和SAP,,AMP Lab贊助者Amazon Web Services、Cloudera,、Huawei,、IBM、Intel,、Microsoft,、NEC、NetApp和VMWare,,國家配套資金加州MICRO項目(助學(xué)金 06-152,,07-010),國家自然科學(xué)基金 (批準(zhǔn) CNS-0509559),,加州大學(xué)工業(yè)/大學(xué)合作研究項目 (UC Discovery)授予的COM07-10240,,以及自然科學(xué)和加拿大工程研究理事會。 參考[1] Amazon EC2. http://aws.amazon.com/ec2. |
|