久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

RDD:基于內(nèi)存集群計算的容錯抽象 | 簡單之美

 codingparty 2014-08-31

該論文來自Berkeley實驗室,,英文標(biāo)題為:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,。下面的翻譯,我是基于科學(xué)網(wǎng)翻譯基礎(chǔ)上進(jìn)行優(yōu)化,、修改,、補充,這篇譯文翻譯得很不錯。在此基礎(chǔ)上,,我增加了來自英文原文的圖和表格數(shù)據(jù),,以及譯文中缺少的未翻譯的部分。如果翻譯措辭或邏輯有誤,,歡迎批評指正,。

摘要

本文提出了分布式內(nèi)存抽象的概念——彈性分布式數(shù)據(jù)集(RDD,Resilient Distributed Datasets),,它具備像MapReduce等數(shù)據(jù)流模型的容錯特性,,并且允許開發(fā)人員在大型集群上執(zhí)行基于內(nèi)存的計算。現(xiàn)有的數(shù)據(jù)流系統(tǒng)對兩種應(yīng)用的處理并不高效:一是迭代式算法,,這在圖應(yīng)用和機器學(xué)習(xí)領(lǐng)域很常見,;二是交互式數(shù)據(jù)挖掘工具。這兩種情況下,,將數(shù)據(jù)保存在內(nèi)存中能夠極大地提高性能,。為了有效地實現(xiàn)容錯,RDD提供了一種高度受限的共享內(nèi)存,,即RDD是只讀的,,并且只能通過其他RDD上的批量操作來創(chuàng)建。盡管如此,,RDD仍然足以表示很多類型的計算,,包括MapReduce和專用的迭代編程模型(如Pregel)等。我們實現(xiàn)的RDD在迭代計算方面比Hadoop快20多倍,,同時還可以在5-7秒內(nèi)交互式地查詢1TB數(shù)據(jù)集,。

1.引言

無論是工業(yè)界還是學(xué)術(shù)界,都已經(jīng)廣泛使用高級集群編程模型來處理日益增長的數(shù)據(jù),,如MapReduce和Dryad,。這些系統(tǒng)將分布式編程簡化為自動提供位置感知性調(diào)度、容錯以及負(fù)載均衡,,使得大量用戶能夠在商用集群上分析超大數(shù)據(jù)集,。

大多數(shù)現(xiàn)有的集群計算系統(tǒng)都是基于非循環(huán)的數(shù)據(jù)流模型。從穩(wěn)定的物理存儲(如分布式文件系統(tǒng))中加載記錄,,記錄被傳入由一組確定性操作構(gòu)成的DAG,,然后寫回穩(wěn)定存儲。DAG數(shù)據(jù)流圖能夠在運行時自動實現(xiàn)任務(wù)調(diào)度和故障恢復(fù),。

盡管非循環(huán)數(shù)據(jù)流是一種很強大的抽象方法,,但仍然有些應(yīng)用無法使用這種方式描述。我們就是針對這些不太適合非循環(huán)模型的應(yīng)用,,它們的特點是在多個并行操作之間重用工作數(shù)據(jù)集,。這類應(yīng)用包括:(1)機器學(xué)習(xí)和圖應(yīng)用中常用的迭代算法(每一步對數(shù)據(jù)執(zhí)行相似的函數(shù)),;(2)交互式數(shù)據(jù)挖掘工具(用戶反復(fù)查詢一個數(shù)據(jù)子集)?;跀?shù)據(jù)流的框架并不明確支持工作集,,所以需要將數(shù)據(jù)輸出到磁盤,然后在每次查詢時重新加載,,這帶來較大的開銷,。

我們提出了一種分布式的內(nèi)存抽象,稱為彈性分布式數(shù)據(jù)集(RDD,,Resilient Distributed Datasets),。它支持基于工作集的應(yīng)用,同時具有數(shù)據(jù)流模型的特點:自動容錯,、位置感知調(diào)度和可伸縮性,。RDD允許用戶在執(zhí)行多個查詢時顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,,這極大地提升了查詢速度,。

RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,,只能通過在其他RDD執(zhí)行確定的轉(zhuǎn)換操作(如map,、join和group by)而創(chuàng)建,然而這些限制使得實現(xiàn)容錯的開銷很低,。與分布式共享內(nèi)存系統(tǒng)需要付出高昂代價的檢查點和回滾機制不同,,RDD通過Lineage來重建丟失的分區(qū):一個RDD中包含了如何從其他RDD衍生所必需的相關(guān)信息,,從而不需要檢查點操作就可以重構(gòu)丟失的數(shù)據(jù)分區(qū),。盡管RDD不是一個通用的共享內(nèi)存抽象,但卻具備了良好的描述能力,、可伸縮性和可靠性,,但卻能夠廣泛適用于數(shù)據(jù)并行類應(yīng)用。

第一個指出非循環(huán)數(shù)據(jù)流存在不足的并非是我們,,例如,,Google的Pregel[21],是一種專門用于迭代式圖算法的編程模型,;Twister[13]和HaLoop[8],,是兩種典型的迭代式MapReduce模型。但是,,對于一些特定類型的應(yīng)用,,這些系統(tǒng)提供了一個受限的通信模型。相比之下,,RDD則為基于工作集的應(yīng)用提供了更為通用的抽象,,用戶可以對中間結(jié)果進(jìn)行顯式的命名和物化,,控制其分區(qū),還能執(zhí)行用戶選擇的特定操作(而不是在運行時去循環(huán)執(zhí)行一系列MapReduce步驟),。RDD可以用來描述Pregel,、迭代式MapReduce,以及這兩種模型無法描述的其他應(yīng)用,,如交互式數(shù)據(jù)挖掘工具(用戶將數(shù)據(jù)集裝入內(nèi)存,,然后執(zhí)行ad-hoc查詢)。

Spark是我們實現(xiàn)的RDD系統(tǒng),,在我們內(nèi)部能夠被用于開發(fā)多種并行應(yīng)用,。Spark采用Scala語言[5]實現(xiàn),提供類似于DryadLINQ的集成語言編程接口[34],,使用戶可以非常容易地編寫并行任務(wù),。此外,隨著Scala新版本解釋器的完善,,Spark還能夠用于交互式查詢大數(shù)據(jù)集,。我們相信Spark會是第一個能夠使用有效、通用編程語言,,并在集群上對大數(shù)據(jù)集進(jìn)行交互式分析的系統(tǒng),。

我們通過微基準(zhǔn)和用戶應(yīng)用程序來評估RDD。實驗表明,,在處理迭代式應(yīng)用上Spark比Hadoop快高達(dá)20多倍,,計算數(shù)據(jù)分析類報表的性能提高了40多倍,同時能夠在5-7秒的延時內(nèi)交互式掃描1TB數(shù)據(jù)集,。此外,,我們還在Spark之上實現(xiàn)了Pregel和HaLoop編程模型(包括其位置優(yōu)化策略),以庫的形式實現(xiàn)(分別使用了100和200行Scala代碼),。最后,,利用RDD內(nèi)在的確定性特性,我們還創(chuàng)建了一種Spark調(diào)試工具rddbg,,允許用戶在任務(wù)期間利用Lineage重建RDD,,然后像傳統(tǒng)調(diào)試器那樣重新執(zhí)行任務(wù)。

本文首先在第2部分介紹了RDD的概念,,然后第3部分描述Spark API,,第4部分解釋如何使用RDD表示幾種并行應(yīng)用(包括Pregel和HaLoop),第5部分討論Spark中RDD的表示方法以及任務(wù)調(diào)度器,,第6部分描述具體實現(xiàn)和rddbg,,第7部分對RDD進(jìn)行評估,第8部分給出了相關(guān)研究工作,,最后第9部分總結(jié),。

2.彈性分布式數(shù)據(jù)集(RDD)

本部分描述RDD和編程模型,。首先討論設(shè)計目標(biāo)(2.1),然后定義RDD(2.2),,討論Spark的編程模型(2.3),,并給出一個示例(2.4),最后對比RDD與分布式共享內(nèi)存(2.5),。

2.1 目標(biāo)和概述

我們的目標(biāo)是為基于工作集的應(yīng)用(即多個并行操作重用中間結(jié)果的這類應(yīng)用)提供抽象,,同時保持MapReduce及其相關(guān)模型的優(yōu)勢特性:即自動容錯、位置感知性調(diào)度和可伸縮性,。RDD比數(shù)據(jù)流模型更易于編程,,同時基于工作集的計算也具有良好的描述能力。

在這些特性中,,最難實現(xiàn)的是容錯性,。一般來說,分布式數(shù)據(jù)集的容錯性有兩種方式:即數(shù)據(jù)檢查點和記錄數(shù)據(jù)的更新,。我們面向的是大規(guī)模數(shù)據(jù)分析,,數(shù)據(jù)檢查點操作成本很高:需要通過數(shù)據(jù)中心的網(wǎng)絡(luò)連接在機器之間復(fù)制龐大的數(shù)據(jù)集,而網(wǎng)絡(luò)帶寬往往比內(nèi)存帶寬低得多,,同時還需要消耗更多的存儲資源(在內(nèi)存中復(fù)制數(shù)據(jù)可以減少需要緩存的數(shù)據(jù)量,,而存儲到磁盤則會拖慢應(yīng)用程序)。所以,,我們選擇記錄更新的方式,。但是,如果更新太多,,那么記錄更新成本也不低,。因此,RDD只支持粗粒度轉(zhuǎn)換,,即在大量記錄上執(zhí)行的單個操作,。將創(chuàng)建RDD的一系列轉(zhuǎn)換記錄下來(即Lineage),,以便恢復(fù)丟失的分區(qū),。

雖然只支持粗粒度轉(zhuǎn)換限制了編程模型,但我們發(fā)現(xiàn)RDD仍然可以很好地適用于很多應(yīng)用,,特別是支持?jǐn)?shù)據(jù)并行的批量分析應(yīng)用,,包括數(shù)據(jù)挖掘、機器學(xué)習(xí),、圖算法等,,因為這些程序通常都會在很多記錄上執(zhí)行相同的操作。RDD不太適合那些異步更新共享狀態(tài)的應(yīng)用,,例如并行web爬行器,。因此,,我們的目標(biāo)是為大多數(shù)分析型應(yīng)用提供有效的編程模型,而其他類型的應(yīng)用交給專門的系統(tǒng),。

2.2 RDD抽象

RDD是只讀的,、分區(qū)記錄的集合。RDD只能基于在穩(wěn)定物理存儲中的數(shù)據(jù)集和其他已有的RDD上執(zhí)行確定性操作來創(chuàng)建,。這些確定性操作稱之為轉(zhuǎn)換,,如map、filter,、groupBy,、join(轉(zhuǎn)換不是程開發(fā)人員在RDD上執(zhí)行的操作)。

RDD不需要物化,。RDD含有如何從其他RDD衍生(即計算)出本RDD的相關(guān)信息(即Lineage),,據(jù)此可以從物理存儲的數(shù)據(jù)計算出相應(yīng)的RDD分區(qū)。

2.3 編程模型

在Spark中,,RDD被表示為對象,,通過這些對象上的方法(或函數(shù))調(diào)用轉(zhuǎn)換。

定義RDD之后,,程序員就可以在動作中使用RDD了,。動作是向應(yīng)用程序返回值,或向存儲系統(tǒng)導(dǎo)出數(shù)據(jù)的那些操作,,例如,,count(返回RDD中的元素個數(shù)),collect(返回元素本身),,save(將RDD輸出到存儲系統(tǒng)),。在Spark中,只有在動作第一次使用RDD時,,才會計算RDD(即延遲計算),。這樣在構(gòu)建RDD的時候,運行時通過管道的方式傳輸多個轉(zhuǎn)換,。

程序員還可以從兩個方面控制RDD,,即緩存和分區(qū)。用戶可以請求將RDD緩存,,這樣運行時將已經(jīng)計算好的RDD分區(qū)存儲起來,,以加速后期的重用。緩存的RDD一般存儲在內(nèi)存中,,但如果內(nèi)存不夠,,可以寫到磁盤上。

另一方面,,RDD還允許用戶根據(jù)關(guān)鍵字(key)指定分區(qū)順序,,這是一個可選的功能,。目前支持哈希分區(qū)和范圍分區(qū)。例如,,應(yīng)用程序請求將兩個RDD按照同樣的哈希分區(qū)方式進(jìn)行分區(qū)(將同一機器上具有相同關(guān)鍵字的記錄放在一個分區(qū)),,以加速它們之間的join操作。在Pregel和HaLoop中,,多次迭代之間采用一致性的分區(qū)置換策略進(jìn)行優(yōu)化,,我們同樣也允許用戶指定這種優(yōu)化。

2.4 示例:控制臺日志挖掘

本部分我們通過一個具體示例來闡述RDD,。假定有一個大型網(wǎng)站出錯,,操作員想要檢查Hadoop文件系統(tǒng)(HDFS)中的日志文件(TB級大小)來找出原因,。通過使用Spark,,操作員只需將日志中的錯誤信息裝載到一組節(jié)點的內(nèi)存中,然后執(zhí)行交互式查詢,。首先,,需要在Spark解釋器中輸入如下Scala命令:

1lines = spark.textFile("hdfs://...")
2errors = lines.filter(_.startsWith("ERROR"))
3errors.cache()

第1行從HDFS文件定義了一個RDD(即一個文本行集合),第2行獲得一個過濾后的RDD,,第3行請求將errors緩存起來,。注意在Scala語法中filter的參數(shù)是一個閉包。

這時集群還沒有開始執(zhí)行任何任務(wù),。但是,,用戶已經(jīng)可以在這個RDD上執(zhí)行對應(yīng)的動作,例如統(tǒng)計錯誤消息的數(shù)目:

1errors.count()

用戶還可以在RDD上執(zhí)行更多的轉(zhuǎn)換操作,,并使用轉(zhuǎn)換結(jié)果,,如:

1// Count errors mentioning MySQL:
2errors.filter(_.contains("MySQL")).count()
3// Return the time fields of errors mentioning
4// HDFS as an array (assuming time is field
5// number 3 in a tab-separated format):
6errors.filter(_.contains("HDFS"))
7    .map(_.split('\t')(3))
8    .collect()

使用errors的第一個action運行以后,Spark會把errors的分區(qū)緩存在內(nèi)存中,,極大地加快了后續(xù)計算速度,。注意,最初的RDD lines不會被緩存,。因為錯誤信息可能只占原數(shù)據(jù)集的很小一部分(小到足以放入內(nèi)存),。
最后,為了說明模型的容錯性,,圖1給出了第3個查詢的Lineage圖,。在lines RDD上執(zhí)行filter操作,得到errors,,然后再filter、map后得到新的RDD,,在這個RDD上執(zhí)行collect操作,。Spark調(diào)度器以流水線的方式執(zhí)行后兩個轉(zhuǎn)換,,向擁有errors分區(qū)緩存的節(jié)點發(fā)送一組任務(wù)。此外,,如果某個errors分區(qū)丟失,,Spark只在相應(yīng)的lines分區(qū)上執(zhí)行filter操作來重建該errors分區(qū)。
f1-lineage
圖1 示例中第三個查詢的Lineage圖,。(方框表示RDD,,箭頭表示轉(zhuǎn)換)

2.5 RDD與分布式共享內(nèi)存

為了進(jìn)一步理解RDD是一種分布式的內(nèi)存抽象,表1列出了RDD與分布式共享內(nèi)存(DSM,,Distributed Shared Memory)[24]的對比,。在DSM系統(tǒng)中,應(yīng)用可以向全局地址空間的任意位置進(jìn)行讀寫操作,。(注意這里的DSM,,不僅指傳統(tǒng)的共享內(nèi)存系統(tǒng),還包括那些通過分布式哈希表或分布式文件系統(tǒng)進(jìn)行數(shù)據(jù)共享的系統(tǒng),,比如Piccolo[28])DSM是一種通用的抽象,,但這種通用性同時也使得在商用集群上實現(xiàn)有效的容錯性更加困難。

RDD與DSM主要區(qū)別在于,,不僅可以通過批量轉(zhuǎn)換創(chuàng)建(即“寫”)RDD,,還可以對任意內(nèi)存位置讀寫。也就是說,,RDD限制應(yīng)用執(zhí)行批量寫操作,,這樣有利于實現(xiàn)有效的容錯。特別地,,RDD沒有檢查點開銷,,因為可以使用Lineage來恢復(fù)RDD。而且,,失效時只需要重新計算丟失的那些RDD分區(qū),,可以在不同節(jié)點上并行執(zhí)行,而不需要回滾整個程序,。

表1 RDD與DSM對比
對比項目 RDD 分布式共享內(nèi)存(DSM)
批量或細(xì)粒度操作 細(xì)粒度操作
批量轉(zhuǎn)換操作 細(xì)粒度操作
一致性 不重要(RDD是不可更改的) 取決于應(yīng)用程序或運行時
容錯性 細(xì)粒度,,低開銷(使用Lineage) 需要檢查點操作和程序回滾
落后任務(wù)的處理 任務(wù)備份 很難處理
任務(wù)安排 基于數(shù)據(jù)存放的位置自動實現(xiàn) 取決于應(yīng)用程序(通過運行時實現(xiàn)透明性)
如果內(nèi)存不夠 與已有的數(shù)據(jù)流系統(tǒng)類似 性能較差(交換?)

注意,,通過備份任務(wù)的拷貝,,RDD還可以處理落后任務(wù)(即運行很慢的節(jié)點),這點與MapReduce[12]類似,。而DSM則難以實現(xiàn)備份任務(wù),,因為任務(wù)及其副本都需要讀寫同一個內(nèi)存位置。

與DSM相比,RDD模型有兩個好處,。第一,,對于RDD中的批量操作,運行時將根據(jù)數(shù)據(jù)存放的位置來調(diào)度任務(wù),,從而提高性能,。第二,對于基于掃描的操作,,如果內(nèi)存不足以緩存整個RDD,,就進(jìn)行部分緩存。把內(nèi)存放不下的分區(qū)存儲到磁盤上,,此時性能與現(xiàn)有的數(shù)據(jù)流系統(tǒng)差不多,。

最后看一下讀操作的粒度。RDD上的很多動作(如count和collect)都是批量讀操作,,即掃描整個數(shù)據(jù)集,,可以將任務(wù)分配到距離數(shù)據(jù)最近的節(jié)點上。同時,,RDD也支持細(xì)粒度操作,,即在哈希或范圍分區(qū)的RDD上執(zhí)行關(guān)鍵字查找,。

3. Spark編程接口

Spark用Scala[5]語言實現(xiàn)了RDD的API,。Scala是一種基于JVM的靜態(tài)類型、函數(shù)式,、面向?qū)ο蟮恼Z言,。我們選擇Scala是因為它簡潔(特別適合交互式使用)、有效(因為是靜態(tài)類型),。但是,,RDD抽象并不局限于函數(shù)式語言,也可以使用其他語言來實現(xiàn)RDD,,比如像Hadoop[2]那樣用類表示用戶函數(shù),。

要使用Spark,開發(fā)者需要編寫一個driver程序,,連接到集群以運行Worker,,如圖2所示。Driver定義了一個或多個RDD,,并調(diào)用RDD上的動作,。Worker是長時間運行的進(jìn)程,將RDD分區(qū)以Java對象的形式緩存在內(nèi)存中,。
f2-spark-runtime
圖2 Spark的運行時,。用戶的driver程序啟動多個worker,,worker從分布式文件系統(tǒng)中讀取數(shù)據(jù)塊,并將計算后的RDD分區(qū)緩存在內(nèi)存中,。

再看看2.4中的例子,,用戶執(zhí)行RDD操作時會提供參數(shù),,比如map傳遞一個閉包(closure,,函數(shù)式編程中的概念)。Scala將閉包表示為Java對象,,如果傳遞的參數(shù)是閉包,,則這些對象被序列化,通過網(wǎng)絡(luò)傳輸?shù)狡渌?jié)點上進(jìn)行裝載,。Scala將閉包內(nèi)的變量保存為Java對象的字段,。例如,var x = 5; rdd.map(_ + x) 這段代碼將RDD中的每個元素加5,??偟膩碚f,Spark的語言集成類似于DryadLINQ,。

RDD本身是靜態(tài)類型對象,,由參數(shù)指定其元素類型。例如,,RDD[int]是一個整型RDD,。不過,我們舉的例子幾乎都省略了這個類型參數(shù),,因為Scala支持類型推斷,。

雖然在概念上使用Scala實現(xiàn)RDD很簡單,但還是要處理一些Scala閉包對象的反射問題,。如何通過Scala解釋器來使用Spark還需要更多工作,,這點我們將在第6部分討論。不管怎樣,,我們都不需要修改Scala編譯器,。

3.1 Spark中的RDD操作

表2列出了Spark中的RDD轉(zhuǎn)換和動作。每個操作都給出了標(biāo)識,,其中方括號表示類型參數(shù),。前面說過轉(zhuǎn)換是延遲操作,用于定義新的RDD,;而動作啟動計算操作,,并向用戶程序返回值或向外部存儲寫數(shù)據(jù)。

表3 Spark中支持的RDD轉(zhuǎn)換和動作
轉(zhuǎn)換 map(f : T ) U) : RDD[T] ) RDD[U]
filter(f : T ) Bool) : RDD[T] ) RDD[T]
flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]
sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)
groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]
reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
union() : (RDD[T]; RDD[T]) ) RDD[T]
join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]
crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]
mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]
partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]
動作 count() : RDD[T] ) Long
collect() : RDD[T] ) Seq[T]
reduce(f : (T; T) ) T) : RDD[T] ) T
lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)
save(path : String) : Outputs RDD to a storage system, e.g., HDFS

注意,,有些操作只對鍵值對可用,,比如join,。另外,函數(shù)名與Scala及其他函數(shù)式語言中的API匹配,,例如map是一對一的映射,,而flatMap是將每個輸入映射為一個或多個輸出(與MapReduce中的map類似)。

除了這些操作以外,,用戶還可以請求將RDD緩存起來,。而且,用戶還可以通過Partitioner類獲取RDD的分區(qū)順序,,然后將另一個RDD按照同樣的方式分區(qū),。有些操作會自動產(chǎn)生一個哈希或范圍分區(qū)的RDD,,像groupByKey,,reduceByKey和sort等。

4. 應(yīng)用程序示例

現(xiàn)在我們講述如何使用RDD表示幾種基于數(shù)據(jù)并行的應(yīng)用,。首先討論一些迭代式機器學(xué)習(xí)應(yīng)用(4.1),,然后看看如何使用RDD描述幾種已有的集群編程模型,即MapReduce(4.2),,Pregel(4.3),,和Hadoop(4.4),。最后討論一下RDD不適合哪些應(yīng)用(4.5),。

4.1 迭代式機器學(xué)習(xí)

很多機器學(xué)習(xí)算法都具有迭代特性,,運行迭代優(yōu)化方法來優(yōu)化某個目標(biāo)函數(shù),,例如梯度下降方法,。如果這些算法的工作集能夠放入內(nèi)存,將極大地加速程序運行,。而且,,這些算法通常采用批量操作,,例如映射和求和,這樣更容易使用RDD來表示,。

例如下面的程序是邏輯回歸[15]的實現(xiàn),。邏輯回歸是一種常見的分類算法,,即尋找一個最佳分割兩組點(即垃圾郵件和非垃圾郵件)的超平面w。算法采用梯度下降的方法:開始時w為隨機值,,在每一次迭代的過程中,,對w的函數(shù)求和,,然后朝著優(yōu)化的方向移動w,。

1val points = spark.textFile(...)
2     .map(parsePoint).persist()
3var w = // random initial vector
4for (i <- 1 to ITERATIONS) {
5     val gradient = points.map{ p =>
6          p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
7     }.reduce((a,b) => a+b)
8     w -= gradient
9}

首先定義一個名為points的緩存RDD,這是在文本文件上執(zhí)行map轉(zhuǎn)換之后得到的,,即將每個文本行解析為一個Point對象,。然后在points上反復(fù)執(zhí)行map和reduce操作,,每次迭代時通過對當(dāng)前w的函數(shù)進(jìn)行求和來計算梯度。7.1小節(jié)我們將看到這種在內(nèi)存中緩存points的方式,,比每次迭代都從磁盤文件裝載數(shù)據(jù)并進(jìn)行解析要快得多,。

已經(jīng)在Spark中實現(xiàn)的迭代式機器學(xué)習(xí)算法還有:kmeans(像邏輯回歸一樣每次迭代時執(zhí)行一對map和reduce操作),期望最大化算法(EM,,兩個不同的map/reduce步驟交替執(zhí)行),,交替最小二乘矩陣分解和協(xié)同過濾算法。Chu等人提出迭代式MapReduce也可以用來實現(xiàn)常用的學(xué)習(xí)算法[11],。

4.2 使用RDD實現(xiàn)MapReduce

MapReduce模型[12]很容易使用RDD進(jìn)行描述,。假設(shè)有一個輸入數(shù)據(jù)集(其元素類型為T),和兩個函數(shù)myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],,代碼如下:

1data.flatMap(myMap)
2    .groupByKey()
3    .map((k, vs) => myReduce(k, vs))

如果任務(wù)包含combiner,,則相應(yīng)的代碼為:

1data.flatMap(myMap)
2    .reduceByKey(myCombiner)
3    .map((k, v) => myReduce(k, v))

ReduceByKey操作在mapper節(jié)點上執(zhí)行部分聚集,與MapReduce的combiner類似,。

4.3 使用RDD實現(xiàn)Pregel

Pregel[21]是面向圖算法的基于BSP范式[32]的編程模型,。程序由一系列超步(Superstep)協(xié)調(diào)迭代運行。在每個超步中,,各個頂點執(zhí)行用戶函數(shù),,并更新相應(yīng)的頂點狀態(tài),,變異圖拓?fù)洌缓笙蛳乱粋€超步的頂點集發(fā)送消息,。這種模型能夠描述很多圖算法,,包括最短路徑,雙邊匹配和PageRank等,。

以PageRank為例介紹一下Pregel的實現(xiàn),。當(dāng)前PageRank[7]記為r,頂點表示狀態(tài),。在每個超步中,,各個頂點向其所有鄰居發(fā)送貢獻(xiàn)值r/n,這里n是鄰居的數(shù)目,。下一個超步開始時,,每個頂點將其分值(rank)更新為 α/N + (1 - α) * Σci,這里的求和是各個頂點收到的所有貢獻(xiàn)值的和,,N是頂點的總數(shù),。

Pregel將輸入的圖劃分到各個worker上,并存儲在其內(nèi)存中,。在每個超步中,,各個worker通過一種類似MapReduce的Shuffle操作交換消息。

Pregel的通信模式可以用RDD來描述,,如圖3,。主要思想是:將每個超步中的頂點狀態(tài)和要發(fā)送的消息存儲為RDD,然后根據(jù)頂點ID分組,,進(jìn)行Shuffle通信(即cogroup操作),。然后對每個頂點ID上的狀態(tài)和消息應(yīng)用用戶函數(shù)(即mapValues操作),產(chǎn)生一個新的RDD,,即(VertexID, (NewState, OutgoingMessages)),。然后執(zhí)行map操作分離出下一次迭代的頂點狀態(tài)和消息(即mapValues和flatMap操作)。代碼如下:

1val vertices = // RDD of (ID, State) pairs
2val messages = // RDD of (ID, Message) pairs
3val grouped = vertices.cogroup(messages)
4val newData = grouped.mapValues {
5    (vert, msgs) => userFunc(vert, msgs)
6    // returns (newState, outgoingMsgs)
7}.cache()
8val newVerts = newData.mapValues((v,ms) => v)
9val newMsgs = newData.flatMap((id,(v,ms)) => ms)

f3-iteration-pregel-using_rdd
圖3 使用RDD實現(xiàn)Pregel時,,一步迭代的數(shù)據(jù)流,。(方框表示RDD,箭頭表示轉(zhuǎn)換)
需要注意的是,,這種實現(xiàn)方法中,,RDD grouped,newData和newVerts的分區(qū)方法與輸入RDD vertices一樣,。所以,,頂點狀態(tài)一直存在于它們開始執(zhí)行的機器上,這跟原Pregel一樣,,這樣就減少了通信成本,。因為cogroup和mapValues保持了與輸入RDD相同的分區(qū)方法,,所以分區(qū)是自動進(jìn)行的。

完整的Pregel編程模型還包括其他工具,,比如combiner,,附錄A討論了它們的實現(xiàn)。下面將討論Pregel的容錯性,,以及如何在實現(xiàn)相同容錯性的同時減少需要執(zhí)行檢查點操作的數(shù)據(jù)量,。

我們差不多用了100行Scala代碼在Spark上實現(xiàn)了一個類Pregel的API。7.2小節(jié)將使用PageRank算法評估它的性能,。

4.3.1 Pregel容錯

當(dāng)前,,Pregel基于檢查點機制來為頂點狀態(tài)及其消息實現(xiàn)容錯[21]。然而作者是這樣描述的:通過在其它的節(jié)點上記錄已發(fā)消息日志,,然后單獨重建丟失的分區(qū),,只需要恢復(fù)局部數(shù)據(jù)即可。上面提到這兩種方式,,RDD都能夠很好地支持,。

通過4.3小節(jié)的實現(xiàn),Spark總是能夠基于Lineage實現(xiàn)頂點和消息RDD的重建,,但是由于過長的Lineage鏈,恢復(fù)可能會付出高昂的代價,。因為迭代RDD依賴于上一個RDD,,對于部分分區(qū)來說,節(jié)點故障可能會導(dǎo)致這些分區(qū)狀態(tài)的所有迭代版本丟失,,這就要求使用一種“級聯(lián)-重新執(zhí)行”[20]的方式去依次重建每一個丟失的分區(qū),。為了避免這個問題,用戶可以周期性地在頂點和消息RDD上執(zhí)行save操作,,將狀態(tài)信息保存到持久存儲中,。然后,Spark能夠在失敗的時候自動地重新計算這些丟失的分區(qū)(而不是回滾整個程序),。

最后,,我們意識到,RDD也能夠?qū)崿F(xiàn)檢查點數(shù)據(jù)的reduce操作,,這要求通過一種高效的檢查點方案來表達(dá)檢查點數(shù)據(jù),。在很多Pregel作業(yè)中,頂點狀態(tài)都包括可變與不可變的組件,,例如,,在PageRank中,與一個頂點相鄰的頂點列表是不可變的,,但是它們的排名是可變的,,在這種情況下,,我們可以使用一個來自可變數(shù)據(jù)的單獨RDD來替換不可變RDD,基于這樣一個較短的Lineage鏈,,檢查點僅僅是可變狀態(tài),,圖4解釋了這種方式。
f4-data-flow-of-pregel-using-rdd
圖4 經(jīng)過優(yōu)化的Pregel使用RDD的數(shù)據(jù)流,??勺儬顟B(tài)RDD必須設(shè)置檢查點,不可變狀態(tài)才可被快速重建,。
在PageRank中,,不可變狀態(tài)(相鄰頂點列表)遠(yuǎn)大于可變狀態(tài)(浮點值),所以這種方式能夠極大地降低開銷,。

4.4 使用RDD實現(xiàn)HaLoop

HaLoop[8]是Hadoop的一個擴(kuò)展版本,,它能夠改善具有迭代特性的MapReduce程序的性能?;贖aLoop編程模型的應(yīng)用,,使用reduce階段的輸出作為map階段下一輪迭代的輸入。它的循環(huán)感知任務(wù)調(diào)度器能夠保證,,在每一輪迭代中處理同一個分區(qū)數(shù)據(jù)的連續(xù)map和reduce任務(wù),,一定能夠在同一臺物理機上執(zhí)行。確保迭代間locality特性,,reduce數(shù)據(jù)在物理節(jié)點之間傳輸,,并且允許數(shù)據(jù)緩存在本地磁盤而能夠被后續(xù)迭代重用。

使用RDD來優(yōu)化HaLoop,,我們在Spark上實現(xiàn)了一個類似HaLoop的API,,這個庫只使用了200行Scala代碼。通過partitionBy能夠保證跨迭代的分區(qū)的一致性,,每一個階段的輸入和輸出被緩存以用于后續(xù)迭代,。

4.5 不適合使用RDD的應(yīng)用

在2.1節(jié)我們討論過,RDD適用于具有批量轉(zhuǎn)換需求的應(yīng)用,,并且相同的操作作用于數(shù)據(jù)集的每一個元素上,。在這種情況下,RDD能夠記住每個轉(zhuǎn)換操作,,對應(yīng)于Lineage圖中的一個步驟,,恢復(fù)丟失分區(qū)數(shù)據(jù)時不需要寫日志記錄大量數(shù)據(jù)。RDD不適合那些通過異步細(xì)粒度地更新來共享狀態(tài)的應(yīng)用,,例如Web應(yīng)用中的存儲系統(tǒng),,或者增量抓取和索引Web數(shù)據(jù)的系統(tǒng),這樣的應(yīng)用更適合使用一些傳統(tǒng)的方法,例如數(shù)據(jù)庫,、RAMCloud[26],、Percolator[27]和Piccolo[28]。我們的目標(biāo)是,,面向批量分析應(yīng)用的這類特定系統(tǒng),,提供一種高效的編程模型,而不是一些異步應(yīng)用程序,。

5. RDD的描述及作業(yè)調(diào)度

我們希望在不修改調(diào)度器的前提下,,支持RDD上的各種轉(zhuǎn)換操作,同時能夠從這些轉(zhuǎn)換獲取Lineage信息,。為此,,我們?yōu)镽DD設(shè)計了一組小型通用的內(nèi)部接口。

簡單地說,,每個RDD都包含:(1)一組RDD分區(qū)(partition,,即數(shù)據(jù)集的原子組成部分);(2)對父RDD的一組依賴,,這些依賴描述了RDD的Lineage,;(3)一個函數(shù),即在父RDD上執(zhí)行何種計算,;(4)元數(shù)據(jù),,描述分區(qū)模式和數(shù)據(jù)存放的位置。例如,,一個表示HDFS文件的RDD包含:各個數(shù)據(jù)塊的一個分區(qū),,并知道各個數(shù)據(jù)塊放在哪些節(jié)點上。而且這個RDD上的map操作結(jié)果也具有同樣的分區(qū),,map函數(shù)是在父數(shù)據(jù)上執(zhí)行的。表3總結(jié)了RDD的內(nèi)部接口,。

表3 Spark中RDD的內(nèi)部接口
操作 含義
partitions() 返回一組Partition對象
preferredLocations(p) 根據(jù)數(shù)據(jù)存放的位置,,返回分區(qū)p在哪些節(jié)點訪問更快
dependencies() 返回一組依賴
iterator(p, parentIters) 按照父分區(qū)的迭代器,逐個計算分區(qū)p的元素
partitioner() 返回RDD是否hash/range分區(qū)的元數(shù)據(jù)信息

設(shè)計接口的一個關(guān)鍵問題就是,,如何表示RDD之間的依賴,。我們發(fā)現(xiàn)RDD之間的依賴關(guān)系可以分為兩類,即:(1)窄依賴(narrow dependencies):子RDD的每個分區(qū)依賴于常數(shù)個父分區(qū)(即與數(shù)據(jù)規(guī)模無關(guān)),;(2)寬依賴(wide dependencies):子RDD的每個分區(qū)依賴于所有父RDD分區(qū),。例如,map產(chǎn)生窄依賴,,而join則是寬依賴(除非父RDD被哈希分區(qū)),。另一個例子見圖5。
f5-rdd-narrow-and-wide-dependencies
圖5 窄依賴和寬依賴的例子,。(方框表示RDD,,實心矩形表示分區(qū))
區(qū)分這兩種依賴很有用,。首先,窄依賴允許在一個集群節(jié)點上以流水線的方式(pipeline)計算所有父分區(qū),。例如,,逐個元素地執(zhí)行map、然后filter操作,;而寬依賴則需要首先計算好所有父分區(qū)數(shù)據(jù),,然后在節(jié)點之間進(jìn)行Shuffle,這與MapReduce類似,。第二,,窄依賴能夠更有效地進(jìn)行失效節(jié)點的恢復(fù),即只需重新計算丟失RDD分區(qū)的父分區(qū),,而且不同節(jié)點之間可以并行計算,;而對于一個寬依賴關(guān)系的Lineage圖,單個節(jié)點失效可能導(dǎo)致這個RDD的所有祖先丟失部分分區(qū),,因而需要整體重新計算,。

通過RDD接口,Spark只需要不超過20行代碼實現(xiàn)便可以實現(xiàn)大多數(shù)轉(zhuǎn)換,。5.1小節(jié)給出了例子,,然后我們討論了怎樣使用RDD接口進(jìn)行調(diào)度(5.2),最后討論一下基于RDD的程序何時需要數(shù)據(jù)檢查點操作(5.3),。

5.1 RDD實現(xiàn)舉例

HDFS文件:目前為止我們給的例子中輸入RDD都是HDFS文件,,對這些RDD可以執(zhí)行:partitions操作返回各個數(shù)據(jù)塊的一個分區(qū)(每個Partition對象中保存數(shù)據(jù)塊的偏移),preferredLocations操作返回數(shù)據(jù)塊所在的節(jié)點列表,,iterator操作對數(shù)據(jù)塊進(jìn)行讀取,。

map:任何RDD上都可以執(zhí)行map操作,返回一個MappedRDD對象,。該操作傳遞一個函數(shù)參數(shù)給map,,對父RDD上的記錄按照iterator的方式執(zhí)行這個函數(shù),并返回一組符合條件的父RDD分區(qū)及其位置,。

union:在兩個RDD上執(zhí)行union操作,,返回兩個父RDD分區(qū)的并集。通過相應(yīng)父RDD上的窄依賴關(guān)系計算每個子RDD分區(qū)(注意union操作不會過濾重復(fù)值,,相當(dāng)于SQL中的UNION ALL),。

sample:抽樣與映射類似,但是sample操作中,,RDD需要存儲一個隨機數(shù)產(chǎn)生器的種子,,這樣每個分區(qū)能夠確定哪些父RDD記錄被抽樣。

join:對兩個RDD執(zhí)行join操作可能產(chǎn)生窄依賴(如果這兩個RDD擁有相同的哈希分區(qū)或范圍分區(qū)),可能是寬依賴,,也可能兩種依賴都有(比如一個父RDD有分區(qū),,而另一父RDD沒有)。

5.2 Spark任務(wù)調(diào)度器

調(diào)度器根據(jù)RDD的結(jié)構(gòu)信息為每個動作確定有效的執(zhí)行計劃,。調(diào)度器的接口是runJob函數(shù),,參數(shù)為RDD及其分區(qū)集,和一個RDD分區(qū)上的函數(shù),。該接口足以表示Spark中的所有動作(即count,、collect、save等),。

總的來說,,我們的調(diào)度器跟Dryad類似,但我們還考慮了哪些RDD分區(qū)是緩存在內(nèi)存中的,。調(diào)度器根據(jù)目標(biāo)RDD的Lineage圖創(chuàng)建一個由stage構(gòu)成的無回路有向圖(DAG),。每個stage內(nèi)部盡可能多地包含一組具有窄依賴關(guān)系的轉(zhuǎn)換,并將它們流水線并行化(pipeline),。stage的邊界有兩種情況:一是寬依賴上的Shuffle操作,;二是已緩存分區(qū),它可以縮短父RDD的計算過程,。例如圖6,。父RDD完成計算后,可以在stage內(nèi)啟動一組任務(wù)計算丟失的分區(qū),。
f6-spark-compute-stage
圖6 Spark怎樣劃分任務(wù)階段(stage)的例子,。實線方框表示RDD,實心矩形表示分區(qū)(黑色表示該分區(qū)被緩存),。要在RDD G上執(zhí)行一個動作,,調(diào)度器根據(jù)寬依賴創(chuàng)建一組stage,并在每個stage內(nèi)部將具有窄依賴的轉(zhuǎn)換流水線化(pipeline),。 本例不用再執(zhí)行stage 1,,因為B已經(jīng)存在于緩存中了,所以只需要運行2和3,。

調(diào)度器根據(jù)數(shù)據(jù)存放的位置分配任務(wù),以最小化通信開銷,。如果某個任務(wù)需要處理一個已緩存分區(qū),,則直接將任務(wù)分配給擁有這個分區(qū)的節(jié)點。否則,,如果需要處理的分區(qū)位于多個可能的位置(例如,,由HDFS的數(shù)據(jù)存放位置決定),則將任務(wù)分配給這一組節(jié)點。

對于寬依賴(例如需要Shuffle的依賴),,目前的實現(xiàn)方式是,,在擁有父分區(qū)的節(jié)點上將中間結(jié)果物化,簡化容錯處理,,這跟MapReduce中物化map輸出很像,。

如果某個任務(wù)失效,只要stage中的父RDD分區(qū)可用,,則只需在另一個節(jié)點上重新運行這個任務(wù)即可,。如果某些stage不可用(例如,Shuffle時某個map輸出丟失),,則需要重新提交這個stage中的所有任務(wù)來計算丟失的分區(qū),。

最后,lookup動作允許用戶從一個哈?;蚍秶謪^(qū)的RDD上,,根據(jù)關(guān)鍵字讀取一個數(shù)據(jù)元素。這里有一個設(shè)計問題,。Driver程序調(diào)用lookup時,,只需要使用當(dāng)前調(diào)度器接口計算關(guān)鍵字所在的那個分區(qū)。當(dāng)然任務(wù)也可以在集群上調(diào)用lookup,,這時可以將RDD視為一個大的分布式哈希表,。這種情況下,任務(wù)和被查詢的RDD之間的并沒有明確的依賴關(guān)系(因為worker執(zhí)行的是lookup),,如果所有節(jié)點上都沒有相應(yīng)的緩存分區(qū),,那么任務(wù)需要告訴調(diào)度器計算哪些RDD來完成查找操作。

5.3 檢查點

盡管RDD中的Lineage信息可以用來故障恢復(fù),,但對于那些Lineage鏈較長的RDD來說,,這種恢復(fù)可能很耗時。例如4.3小節(jié)中的Pregel任務(wù),,每次迭代的頂點狀態(tài)和消息都跟前一次迭代有關(guān),,所以Lineage鏈很長。如果將Lineage鏈存到物理存儲中,,再定期對RDD執(zhí)行檢查點操作就很有效,。

一般來說,Lineage鏈較長,、寬依賴的RDD需要采用檢查點機制,。這種情況下,集群的節(jié)點故障可能導(dǎo)致每個父RDD的數(shù)據(jù)塊丟失,,因此需要全部重新計算[20],。將窄依賴的RDD數(shù)據(jù)存到物理存儲中可以實現(xiàn)優(yōu)化,,例如前面4.1小節(jié)邏輯回歸的例子,將數(shù)據(jù)點和不變的頂點狀態(tài)存儲起來,,就不再需要檢查點操作,。

當(dāng)前Spark版本提供檢查點API,但由用戶決定是否需要執(zhí)行檢查點操作,。今后我們將實現(xiàn)自動檢查點,,根據(jù)成本效益分析確定RDD Lineage圖中的最佳檢查點位置。

值得注意的是,,因為RDD是只讀的,,所以不需要任何一致性維護(hù)(例如寫復(fù)制策略,分布式快照或者程序暫停等)帶來的開銷,,后臺執(zhí)行檢查點操作,。

我們使用10000行Scala代碼實現(xiàn)了Spark。系統(tǒng)可以使用任何Hadoop數(shù)據(jù)源(如HDFS,,Hbase)作為輸入,,這樣很容易與Hadoop環(huán)境集成。Spark以庫的形式實現(xiàn),,不需要修改Scala編譯器,。

這里討論關(guān)于實現(xiàn)的三方面問題:(1)修改Scala解釋器,允許交互模式使用Spark(6.1),;(2)緩存管理(6.2),;(3)調(diào)試工具rddbg(6.3)。

6. 實現(xiàn)

6.1 解釋器的集成

像Ruby和Python一樣,,Scala也有一個交互式shell,。基于內(nèi)存的數(shù)據(jù)可以實現(xiàn)低延時,,我們希望允許用戶從解釋器交互式地運行Spark,,從而在大數(shù)據(jù)集上實現(xiàn)大規(guī)模并行數(shù)據(jù)挖掘。

Scala解釋器通常根據(jù)將用戶輸入的代碼行,,來對類進(jìn)行編譯,,接著裝載到JVM中,然后調(diào)用類的函數(shù),。這個類是一個包含輸入行變量或函數(shù)的單例對象,,并在一個初始化函數(shù)中運行這行代碼。例如,,如果用戶輸入代碼var x = 5,,接著又輸入println(x),則解釋器會定義一個包含x的Line1類,,并將第2行編譯為println(Line1.getInstance().x),。

在Spark中我們對解釋器做了兩點改動:

  1. 類傳輸:解釋器能夠支持基于HTTP傳輸類字節(jié)碼,這樣worker節(jié)點就能獲取輸入每行代碼對應(yīng)的類的字節(jié)碼,。
  2. 改進(jìn)的代碼生成邏輯:通常每行上創(chuàng)建的單態(tài)對象通過對應(yīng)類上的靜態(tài)方法進(jìn)行訪問,。也就是說,如果要序列化一個閉包,,它引用了前面代碼行中變量,,比如上面的例子Line1.x,Java不會根據(jù)對象關(guān)系傳輸包含x的Line1實例,。所以worker節(jié)點不會收到x。我們將這種代碼生成邏輯改為直接引用各個行對象的實例。圖7說明了解釋器如何將用戶輸入的一組代碼行解釋為Java對象,。

f7-spark-interpreter-translation
圖7 Spark解釋器如何將用戶輸入的兩行代碼解釋為Java對象
Spark解釋器便于跟蹤處理大量對象關(guān)系引用,,并且便利了HDFS數(shù)據(jù)集的研究。我們計劃以Spark解釋器為基礎(chǔ),,開發(fā)提供高級數(shù)據(jù)分析語言支持的交互式工具,,比如類似SQL和Matlab。

6.2 緩存管理

Worker節(jié)點將RDD分區(qū)以Java對象的形式緩存在內(nèi)存中,。由于大部分操作是基于掃描的,,采取RDD級的LRU(最近最少使用)替換策略(即不會為了裝載一個RDD分區(qū)而將同一RDD的其他分區(qū)替換出去)。目前這種簡單的策略適合大多數(shù)用戶應(yīng)用,。另外,,使用帶參數(shù)的cache操作可以設(shè)定RDD的緩存優(yōu)先級。

6.3 rddbg:RDD程序的調(diào)試工具

RDD的初衷是為了實現(xiàn)容錯以能夠再計算(re-computation),,這個特性使得調(diào)試更容易,。我們創(chuàng)建了一個名為rddbg的調(diào)試工具,它是通過基于程序記錄的Lineage信息來實現(xiàn)的,,允許用戶:(1)重建任何由程序創(chuàng)建的RDD,,并執(zhí)行交互式查詢;(2)使用一個單進(jìn)程Java調(diào)試器(如jdb)傳入計算好的RDD分區(qū),,能夠重新運行作業(yè)中的任何任務(wù),。

我們強調(diào)一下,rddbg不是一個完全重放的調(diào)試器:特別是不對非確定性的代碼或動作進(jìn)行重放,。但如果某個任務(wù)一直運行很慢(比如由于數(shù)據(jù)分布不均勻或者異常輸入等原因),,仍然可以用它來幫助找到其中的邏輯錯誤和性能問題。

舉個例子,,我們使用rddbg去解決用戶Spam分類作業(yè)中的一個bug,,這個作業(yè)中的每次迭代都產(chǎn)生0值。在調(diào)試器中重新執(zhí)行reduce任務(wù),,很快就能發(fā)現(xiàn),,輸入的權(quán)重向量(存儲在一個用戶自定義的向量類中)竟然是空值,。由于從一個未初始化的稀疏向量中讀取總是返回0,運行時也不會拋出異常,。在這個向量類中設(shè)置一個斷點,,然后運行這個任務(wù),引導(dǎo)程序很快就運行到設(shè)置的斷點處,,我們發(fā)現(xiàn)向量類的一個數(shù)組字段的值為空,,我們診斷出了這個bug:稀疏向量類中的數(shù)據(jù)字段被錯誤地使用transient來修飾,導(dǎo)致序列化時忽略了該字段的數(shù)據(jù),。

rddbg給程序執(zhí)行帶來的開銷很小,。程序本來就需要將各個RDD中的所有閉包序列化并通過網(wǎng)絡(luò)傳送,只不過使用rddbg同時還要將這些閉集記錄到磁盤,。

7. 評估

我們在Amazon EC2[1]上進(jìn)行了一系列實驗來評估Spark及RDD的性能,,并與Hadoop及其他應(yīng)用程序的基準(zhǔn)進(jìn)行了對比??偟恼f來,,結(jié)果如下:
(1)對于迭代式機器學(xué)習(xí)應(yīng)用,Spark比Hadoop快20多倍,。這種加速比是因為:數(shù)據(jù)存儲在內(nèi)存中,,同時Java對象緩存避免了反序列化操作。
(2)用戶編寫的應(yīng)用程序執(zhí)行結(jié)果很好,。例如,,Spark分析報表比Hadoop快40多倍。
(3)如果節(jié)點發(fā)生失效,,通過重建那些丟失的RDD分區(qū),,Spark能夠?qū)崿F(xiàn)快速恢復(fù)。
(4)Spark能夠在5-7s延時范圍內(nèi),,交互式地查詢1TB大小的數(shù)據(jù)集,。
我們基準(zhǔn)測試首先從一個運行在Hadoop上的具有迭代特征的機器學(xué)習(xí)應(yīng)用(7.1)和PageRank(7.2)開始,然后評估在Spark中當(dāng)工作集不能適應(yīng)緩存(7.4)時系統(tǒng)容錯恢復(fù)能力(7.3),,最后討論用戶應(yīng)用程序(7.5)和交互式數(shù)據(jù)挖掘(7.6)的結(jié)果,。
除非特殊說明,我們的實驗使用m1.xlarge EC2 節(jié)點,,4核15GB內(nèi)存,,使用HDFS作為持久存儲,塊大小為256M,。在每個作業(yè)運行執(zhí)行時,,為了保證磁盤讀時間更加精確,我們清理了集群中每個節(jié)點的操作系統(tǒng)緩存,。

7.1 可迭代的機器學(xué)習(xí)應(yīng)用

我們實現(xiàn)了2個迭代式機器學(xué)習(xí)(ML)應(yīng)用,,Logistic回歸和K-means算法,,與如下系統(tǒng)進(jìn)行性能對比:

  • Hadoop:Hadoop 0.20.0穩(wěn)定版。
  • HadoopBinMem:在首輪迭代中執(zhí)行預(yù)處理,,通過將輸入數(shù)據(jù)轉(zhuǎn)換成為開銷較低的二進(jìn)制格式來減少后續(xù)迭代過程中文本解析的開銷,,在HDFS中加載到內(nèi)存。
  • Spark:基于RDD的系統(tǒng),,在首輪迭代中緩存Java對象以減少后續(xù)迭代過程中解析、反序列化的開銷,。

我們使用同一數(shù)據(jù)集在相同條件下運行Logistic回歸和K-means算法:使用400個任務(wù)(每個任務(wù)處理的輸入數(shù)據(jù)塊大小為256M),,在25-100臺機器,執(zhí)行10次迭代處理100G輸入數(shù)據(jù)集(表4),。兩個作業(yè)的關(guān)鍵區(qū)別在于每輪迭代單個字節(jié)的計算量不同,。K-means的迭代時間取決于更新聚類坐標(biāo)耗時,Logistic回歸是非計算密集型的,,但是在序列化和解析過程中非常耗時,。
由于典型的機器學(xué)習(xí)算法需要數(shù)10輪迭代,然后再合并,,我們分別統(tǒng)計了首輪迭代和后續(xù)迭代計算的耗時,,并從中發(fā)現(xiàn),在內(nèi)存中緩存RDD極大地加快了后續(xù)迭代的速度,。

表4 用于Spark基準(zhǔn)程序的數(shù)據(jù)
應(yīng)用 數(shù)據(jù)描述 大小
Logistic回歸 10億9維點數(shù)據(jù) 100G
K-means 10億10維點數(shù)據(jù)(k=10) 100G
PageRank 400萬Wikipedia文章超鏈接圖 49G
交互式數(shù)據(jù)挖掘 Wikipedia瀏覽日志(2008-10~2009-4) 1TB

首輪迭代,。在首輪迭代過程中,三個系統(tǒng)都是從HDFS中讀取文本數(shù)據(jù)作為輸入,。圖9中“First Iteration”顯示了首輪迭代的柱狀圖,,實驗中Spark快于Hadoop,主要是因為Hadoop中的各個分布式組件基于心跳協(xié)議來發(fā)送信號帶來了開銷,。HadoopBinMem是最慢的,,因為它通過一個額外的MapReduce作業(yè)將數(shù)據(jù)轉(zhuǎn)換成二進(jìn)制格式。
f8-first-iteration-bars
圖8 首輪迭代后Hadoop,、HadoopBinMen,、Spark運行時間對比

后續(xù)迭代。圖9顯示了后續(xù)迭代的平均耗時,,圖8對比了不同聚類大小條件下耗時情況,,我們發(fā)現(xiàn)在100個節(jié)點上運行Logistic回歸程序,Spark比Hadoop,、HadoopBinMem分別快25.3,、20.7倍。從圖8(b)可以看到,,Spark僅僅比Hadoop,、HadoopBinMem分別快1.9,、3.2倍,這是因為K-means程序的開銷取決于計算(用更多的節(jié)點有助于提高計算速度的倍數(shù)),。

后續(xù)迭代中,,Hadoop仍然從HDFS讀取文本數(shù)據(jù)作為輸入,所以從首輪迭代開始Hadoop的迭代時間并沒有明顯的改善,。使用預(yù)先轉(zhuǎn)換的SequenceFile文件(Hadoop內(nèi)建的二進(jìn)制文件格式),,HadoopBinMem在后續(xù)迭代中節(jié)省了解析的代價,但是仍然帶來的其他的開銷,,如從HDFS讀SequenceFile文件并轉(zhuǎn)換成Java對象,。因為Spark直接讀取緩存于RDD中的Java對象,隨著聚類尺寸的線性增長,,迭代時間大幅下降,。
f9-length-of-first-and-later-iterations
圖9:首輪及其后續(xù)迭代平均時間對比
理解速度提升。我們非常驚奇地發(fā)現(xiàn),,Spark甚至勝過了基于內(nèi)存存儲二進(jìn)制數(shù)據(jù)的Hadoop(HadoopBinMem),,幅度高達(dá)20倍之多,Hadoop運行慢是由于如下幾個原因:

  1. Hadoop軟件棧的最小開銷
  2. 讀數(shù)據(jù)時HDFS棧的開銷
  3. 將二進(jìn)制記錄轉(zhuǎn)換成內(nèi)存Java對象的代價

為了估測1,,我們運行空的Hadoop作業(yè),,僅僅執(zhí)行作業(yè)的初始化、啟動任務(wù),、清理工作就至少耗時25秒,。對于2,我們發(fā)現(xiàn)為了服務(wù)每一個HDFS數(shù)據(jù)塊,,HDFS進(jìn)行了多次復(fù)制以及計算校驗和操作,。

為了估測3,我們在單個節(jié)點上運行了微基準(zhǔn)程序,,在輸入的256M數(shù)據(jù)上計算Logistic回歸,,結(jié)果如表5所示。首先,,在內(nèi)存中的HDFS文件和本地文件的不同導(dǎo)致通過HDFS接口讀取耗時2秒,,甚至數(shù)據(jù)就在本地內(nèi)存中。其次,,文本和二進(jìn)制格式輸入的不同造成了解析耗時7秒的開銷,。最后,預(yù)解析的二進(jìn)制文件轉(zhuǎn)換為內(nèi)存中的Java對象,,耗時3秒,。每個節(jié)點處理多個塊時這些開銷都會累積起來,然而通過緩存RDD作為內(nèi)存中的Java對象,Spark只需要耗時3秒,。

表5 Logistic回歸迭代時間

內(nèi)存中的HDFS文件 內(nèi)存中的本地文件 緩存的RDD
文本輸入

二進(jìn)制輸入
15.38 (0.26)

8.38 (0.10)
13.13 (0.26)

6.86 (0.02)
2.93 (0.31)

2.93 (0.31)

7.2 PageRank

通過使用存儲在HDFS上的49G Wikipedia導(dǎo)出數(shù)據(jù),,我們比較了使用RDD實現(xiàn)的Pregel與使用Hadoop計算PageRank的性能。PageRank算法通過10輪迭代處理了大約400萬文章的鏈接圖數(shù)據(jù),,圖10顯示了在30個節(jié)點上,,Spark處理速度是Hadoop的2倍多,改進(jìn)后對輸入進(jìn)行Hash分區(qū)速度提升到2.6倍,,使用Combiner后提升到3.6倍,,這些結(jié)果數(shù)據(jù)也隨著節(jié)點擴(kuò)展到60個時同步放大。
f10-compare-spark-and-hadoop
圖10 迭代時間對比

7.3 容錯恢復(fù)

基于K-means算法應(yīng)用程序,,我們評估了在單點故障(SPOF)時使用Lneage信息創(chuàng)建RDD分區(qū)的開銷,。圖11顯示了,K-means應(yīng)用程序運行在75個節(jié)點的集群中進(jìn)行了10輪迭代,,我們在正常操作和進(jìn)行第6輪迭代開始時一個節(jié)點發(fā)生故障的情況下對耗時進(jìn)行了對比。沒有任何失敗,,每輪迭代啟動了400個任務(wù)處理100G數(shù)據(jù),。
f11-iteration-k-means-spof
圖11 SPOF時K-means應(yīng)用程序迭代時間
第5輪迭代結(jié)束時大約耗時58秒,第6輪迭代時Kill掉一個節(jié)點,,該節(jié)點上的任務(wù)都被終止(包括緩存的分區(qū)數(shù)據(jù)),。Spark調(diào)度器調(diào)度這些任務(wù)在其他節(jié)點上重新并行運行,并且重新讀取基于Lineage信息重建的RDD輸入數(shù)據(jù)并進(jìn)行緩存,,這使得迭代計算耗時增加到80秒,。一旦丟失的RDD分區(qū)被重建,平均迭代時間又回落到58秒,。

7.4 內(nèi)存不足時表現(xiàn)

到現(xiàn)在為止,,我們能保證集群中的每個節(jié)點都有足夠的內(nèi)存去緩存迭代過程中使用的RDD,如果沒有足夠的內(nèi)存來緩存一個作業(yè)的工作集,,Spark又是如何運行的呢,?在實驗中,我們通過在每個節(jié)點上限制緩存RDD所需要的內(nèi)存資源來配置Spark,,在不同的緩存配置條件下執(zhí)行Logistic回歸,,結(jié)果如圖12。我們可以看出,,隨著緩存的減小,,性能平緩地下降。
f12-spark-performance-limit-cache-size-of-rdd
圖12 Spark上運行Logistic回歸的性能表現(xiàn)

7.5 基于Spark構(gòu)建的用戶應(yīng)用程序

In-Memory分析,。視頻分發(fā)公司Conviva使用Spark極大地提升了為客戶處理分析報告的速度,,以前基于Hadoop使用大約20個Hive[3]查詢來完成,這些查詢作用在相同的數(shù)據(jù)子集上(滿足用戶提供的條件),但是在不同分組的字段上執(zhí)行聚合操作(SUM,、AVG,、COUNT DISTINCT等)需要使用單獨的MapReduce作業(yè)。該公司使用Spark只需要將相關(guān)數(shù)據(jù)加載到內(nèi)存中一次,,然后運行上述聚合操作,,在Hadoop集群上處理200G壓縮數(shù)據(jù)并生成報耗時20小時,而使用Spark基于96G內(nèi)存的2個節(jié)點耗時30分鐘即可完成,,速度提升40倍,,主要是因為不需要再對每個作業(yè)重復(fù)地執(zhí)行解壓縮和過濾操作。

城市交通建模,。在Berkeley的Mobile Millennium項目[17]中,,基于一系列分散的汽車GPS監(jiān)測數(shù)據(jù),研究人員使用并行化機器學(xué)習(xí)算法來推算公路交通擁堵狀況,。數(shù)據(jù)來自市區(qū)10000個互聯(lián)的公路線路網(wǎng),,還有600000個由汽車GPS裝置采集到的樣本數(shù)據(jù),這些數(shù)據(jù)記錄了汽車在兩個地點之間行駛的時間(每一條路線的行駛時間可能跨多個公路線路網(wǎng)),。使用一個交通模型,,通過推算跨多個公路網(wǎng)行駛耗時預(yù)期,系統(tǒng)能夠估算擁堵狀況,。研究人員使用Spark實現(xiàn)了一個可迭代的EM算法,,其中包括向Worker節(jié)點廣播路線網(wǎng)絡(luò)信息,在E和M階段之間執(zhí)行reduceByKey操作,,應(yīng)用從20個節(jié)點擴(kuò)展到80個節(jié)點(每個節(jié)點4核),,如圖13(a)所示:
f13-run-time-of-per-iteration
圖13 每輪迭代運行時間(a)交通建模應(yīng)用程序(b)基于Spark的社交網(wǎng)絡(luò)的Spam分類
社交網(wǎng)絡(luò)Spam分類。Berkeley的Monarch項目[31]使用Spark識別Twitter消息上的Spam鏈接,。他們在Spark上實現(xiàn)了一個類似7.1小節(jié)中示例的Logistic回歸分類器,,不同的是使用分布式的reduceByKey操作并行對梯度向量求和。圖13(b)顯示了基于50G數(shù)據(jù)子集訓(xùn)練訓(xùn)練分類器的結(jié)果,,整個數(shù)據(jù)集是250000的URL,、至少10^7個與網(wǎng)絡(luò)相關(guān)的特征/維度,內(nèi)容,、詞性與訪問一個URL的頁面相關(guān),。隨著節(jié)點的增加,這并不像交通應(yīng)用程序那樣近似線性,,主要是因為每輪迭代的固定通信代價較高,。

7.6 交互式數(shù)據(jù)挖掘

為了展示Spark交互式處理大數(shù)據(jù)集的能力,我們在100個m2.4xlarge EC2實例(8核68G內(nèi)存)上使用Spark分析1TB從2008-10到2009-4這段時間的Wikipedia頁面瀏覽日志數(shù)據(jù),,在整個輸入數(shù)據(jù)集上簡單地查詢?nèi)缦聝?nèi)容以獲取頁面瀏覽總數(shù):(1)全部頁面,;(2)頁面的標(biāo)題能精確匹配給定的關(guān)鍵詞;(3)頁面的標(biāo)題能部分匹配給定的關(guān)鍵詞。
f14-response-time-of-interactive-queries
圖14 顯示了分別在整個,、1/2,、1/10的數(shù)據(jù)上查詢的響應(yīng)時間,甚至1TB數(shù)據(jù)在Spark上查詢僅耗時5-7秒,,這比直接操作磁盤數(shù)據(jù)快幾個數(shù)量級,。例如,從磁盤上查詢1TB數(shù)據(jù)耗時170秒,,這表明了RDD緩存使得Spark成為一個交互式數(shù)據(jù)挖掘的強大工具,。

8. 相關(guān)工作

分布式共享內(nèi)存(DSM)。RDD可以看成是一個基于DSM研究[24]得到的抽象,。在2.5節(jié)我們討論過,,RDD提供了一個比DSM限制更嚴(yán)格的編程模型,并能在節(jié)點失效時高效地重建數(shù)據(jù)集,。DSM通過檢查點[19]實現(xiàn)容錯,,而Spark使用Lineage重建RDD分區(qū),這些分區(qū)可以在不同的節(jié)點上重新并行處理,,而不需要將整個程序回退到檢查點再重新運行,。RDD能夠像MapReduce一樣將計算推向數(shù)據(jù)[12],并通過推測執(zhí)行來解決某些任務(wù)計算進(jìn)度落后的問題,,推測執(zhí)行在一般的DSM系統(tǒng)上是很難實現(xiàn)的。

In-Memory集群計算,。Piccolo[28]是一個基于可變的,、In-Memory的分布式表的集群編程模型。因為Piccolo允許讀寫表中的記錄,,它具有與DSM類似的恢復(fù)機制,,需要檢查點和回滾,但是不能推測執(zhí)行,,也沒有提供類似groupBy,、sort等更高級別的數(shù)據(jù)流算子,用戶只能直接讀取表單元數(shù)據(jù)來實現(xiàn),??梢姡琍iccolo是比Spark更低級別的編程模型,,但是比DSM要高級,。

RAMClouds[26]適合作為Web應(yīng)用的存儲系統(tǒng),它同樣提供了細(xì)粒度讀寫操作,,所以需要通過記錄日志來實現(xiàn)容錯,。

數(shù)據(jù)流系統(tǒng)。RDD借鑒了DryadLINQ[34]、Pig[25]和FlumeJava[9]的“并行收集”編程模型,,通過允許用戶顯式地將未序列化的對象保存在內(nèi)存中,,以此來控制分區(qū)和基于key隨機查找,從而有效地支持基于工作集的應(yīng)用,。RDD保留了那些數(shù)據(jù)流系統(tǒng)更高級別的編程特性,,這對那些開發(fā)人員來說也比較熟悉,而且,,RDD也能夠支持更多類型的應(yīng)用,。RDD新增的擴(kuò)展,從概念上看很簡單,,其中Spark是第一個使用了這些特性的系統(tǒng),,類似DryadLINQ編程模型,能夠有效地支持基于工作集的應(yīng)用,。

面向基于工作集的應(yīng)用,,已經(jīng)開發(fā)了一些專用系統(tǒng),像Twister[13],、HaLoop[8]實現(xiàn)了一個支持迭代的MapReduce模型,;Pregel[21],支持圖應(yīng)用的BSP計算模型,。RDD是一個更通用的抽象,,它能夠描述支持迭代的MapReduce、Pregel,,還有現(xiàn)有一些系統(tǒng)未能處理的應(yīng)用,,如交互式數(shù)據(jù)挖掘。特別地,,它能夠讓開發(fā)人員動態(tài)地選擇操作來運行在RDD上(如查看查詢的結(jié)果以決定下一步運行哪個查詢),,而不是提供一系列固定的步驟去執(zhí)行迭代,RDD還支持更多類型的轉(zhuǎn)換,。

最后,,Dremel[22]是一個低延遲查詢引擎,它面向基于磁盤存儲的大數(shù)據(jù)集,,這類數(shù)據(jù)集是把嵌套記錄數(shù)據(jù)生成基于列的格式,。這種格式的數(shù)據(jù)也能夠保存為RDD并在Spark系統(tǒng)中使用,但Spark也具備將數(shù)據(jù)加載到內(nèi)存來實現(xiàn)快速查詢的能力,。

Lineage,。我們通過參考[6]到[10]做過調(diào)研,在科學(xué)計算和數(shù)據(jù)庫領(lǐng)域,,對于一些應(yīng)用,,如需要解釋結(jié)果以及允許被重新生成,、工作流中發(fā)現(xiàn)了bug或者數(shù)據(jù)集丟失需要重新處理數(shù)據(jù),表示數(shù)據(jù)的Lineage和原始信息一直以來都是一個研究課題,。RDD提供了一個受限的編程模型,,在這個模型中使用細(xì)粒度的Lineage來表示是非常容易的,因此它可以被用于容錯,。

緩存系統(tǒng),。Nectar[14]能夠通過識別帶有程序分析的子表達(dá)式,跨DryadLINQ作業(yè)重用中間結(jié)果,,如果將這種能力加入到基于RDD的系統(tǒng)會非常有趣,。但是Nectar并沒有提供In-Memory緩存,也不能夠讓用戶顯式地控制應(yīng)該緩存那個數(shù)據(jù)集,,以及如何對其進(jìn)行分區(qū),。Ciel[23]同樣能夠記住任務(wù)結(jié)果,但不能提供In-Memory緩存并顯式控制它,。

語言迭代,。DryadLINQ[34]能夠使用LINQ獲取到表達(dá)式樹然后在集群上運行,Spark系統(tǒng)的語言集成與它很類似,。不像DryadLINQ,,Spark允許用戶顯式地跨查詢將RDD存儲到內(nèi)存中,并通過控制分區(qū)來優(yōu)化通信,。Spark支持交互式處理,,但DryadLINQ卻不支持。

關(guān)系數(shù)據(jù)庫,。從概念上看,,RDD類似于數(shù)據(jù)庫中的視圖,緩存RDD類似于物化視圖[29],。然而,數(shù)據(jù)庫像DSM系統(tǒng)一樣,,允許典型地讀寫所有記錄,,通過記錄操作和數(shù)據(jù)的日志來實現(xiàn)容錯,還需要花費額外的開銷來維護(hù)一致性,。RDD編程模型通過增加更多限制來避免這些開銷,。

9. 總結(jié)

我們提出的RDD是一個面向,運行在普通商用機集群之上并行數(shù)據(jù)處理應(yīng)用的分布式內(nèi)存抽象,。RDD廣泛支持基于工作集的應(yīng)用,,包括迭代式機器學(xué)習(xí)和圖算法,還有交互式數(shù)據(jù)挖掘,,然而它保留了數(shù)據(jù)流模型中引人注目的特點,,如自動容錯恢復(fù),,處理執(zhí)行進(jìn)度落后的任務(wù),以及感知調(diào)度,。它是通過限制編程模型,,進(jìn)而允許高效地重建RDD分區(qū)來實現(xiàn)的。RDD實現(xiàn)處理迭代式作業(yè)的速度超過Hadoop大約20倍,,而且還能夠交互式查詢數(shù)百G數(shù)據(jù),。

致謝

首先感謝Spark用戶,包括Timothy Hunter,、Lester Mackey,、Dilip Joseph、Jibin Zhan和Teodor Moldovan,,他們在真實的應(yīng)用中使用Spark,,提出了寶貴的建議,同時也發(fā)現(xiàn)了一些新的研究挑戰(zhàn),。這次研究離不開以下組織或團(tuán)體的大力支持:Berkeley AMP Lab創(chuàng)立贊助者Google和SAP,,AMP Lab贊助者Amazon Web Services、Cloudera,、Huawei,、IBM、Intel,、Microsoft,、NEC、NetApp和VMWare,,國家配套資金加州MICRO項目(助學(xué)金 06-152,,07-010),國家自然科學(xué)基金 (批準(zhǔn) CNS-0509559),,加州大學(xué)工業(yè)/大學(xué)合作研究項目 (UC Discovery)授予的COM07-10240,,以及自然科學(xué)和加拿大工程研究理事會。

參考

[1] Amazon EC2. http://aws.amazon.com/ec2.
[2] Apache Hadoop. http://hadoop..
[3] Apache Hive. http://hadoop./hive.
[4] Applications powered by Hadoop. http://wiki./hadoop/PoweredBy.
[5] Scala. http://www..
[6] R. Bose and J. Frew. Lineage retrieval for scientific data processing: a survey. ACM Computing Surveys, 37:1–28,
2005.
[7] S. Brin and L. Page. The anatomy of a large-scale hypertextual web search engine. In WWW, 1998.
[8] Y. Bu, B. Howe, M. Balazinska, and M. D. Ernst. HaLoop: efficient iterative data processing on large clusters. Proc. VLDB Endow., 3:285–296, September 2010.
[9] C. Chambers, A. Raniwala, F. Perry, S. Adams, R. R. Henry, R. Bradshaw, and N. Weizenbaum. Flumejava: easy, efficient data-parallel pipelines. In Proceedings of the 2010 ACM SIGPLAN conference on Programming language design and implementation, PLDI ’10. ACM, 2010.
[10] J. Cheney, L. Chiticariu, and W.-C. Tan. Provenance in databases: Why, how, and where. Foundations and Trends in Databases, 1(4):379–474, 2009.
[11] C. T. Chu, S. K. Kim, Y. A. Lin, Y. Yu, G. R. Bradski, A. Y. Ng, and K. Olukotun. Map-reduce for machine learning on multicore. In NIPS ’06, pages 281–288. MIT Press, 2006.
[12] J. Dean and S. Ghemawat. MapReduce: Simplified data processing on large clusters. In OSDI, 2004.
[13] J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S.-H. Bae, J. Qiu, and G. Fox. Twister: a runtime for iterative mapreduce. In HPDC ’10, 2010.
[14] P. K. Gunda, L. Ravindranath, C. A. Thekkath, Y. Yu, and L. Zhuang. Nectar: automatic management of data and computation in datacenters. In OSDI ’10, 2010.
[15] T. Hastie, R. Tibshirani, and J. Friedman. The Elements of Statistical Learning: Data Mining, Inference, and Prediction. Springer Publishing Company, New York, NY, 2009.
[16] U. Hoelzle and L. A. Barroso. The Datacenter as a Computer: An Introduction to the Design of Warehouse-Scale Machines. Morgan and Claypool Publishers, 1st edition, 2009.
[17] Mobile Millennium Project. http://traffic..
[18] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In EuroSys 07, 2007.
[19] A.-M. Kermarrec, G. Cabillic, A. Gefflaut, C. Morin, and I. Puaut. A recoverable distributed shared memory integrating coherence and recoverability. In FTCS ’95, 1995.
[20] S. Y. Ko, I. Hoque, B. Cho, and I. Gupta. On availability of intermediate data in cloud computations. In HotOS
’09, 2009.
[21] G. Malewicz, M. H. Austern, A. J. Bik, J. C. Dehnert, I. Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In SIGMOD, pages 135–146, 2010.
[22] S. Melnik, A. Gubarev, J. J. Long, G. Romer, S. Shivakumar, M. Tolton, and T. Vassilakis. Dremel: interactive analysis of web-scale datasets. Proc. VLDB Endow., 3:330–339, Sept 2010.
[23] D. G. Murray, M. Schwarzkopf, C. Smowton, S. Smith, A. Madhavapeddy, and S. Hand. Ciel: a universal execution engine for distributed data-flow computing. In NSDI, 2011.
[24] B. Nitzberg and V. Lo. Distributed shared memory: a survey of issues and algorithms. Computer, 24(8):52–60, aug 1991.
[25] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig latin: a not-so-foreign language for data processing. In SIGMOD ’08, pages 1099–1110.
[26] J. Ousterhout, P. Agrawal, D. Erickson, C. Kozyrakis, J. Leverich, D. Mazi ` eres, S. Mitra, A. Narayanan, G. Parulkar, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for RAMClouds: scalable high-performance storage entirely in dram. SIGOPS Oper. Syst. Rev., 43:92–105, Jan 2010.
[27] D. Peng and F. Dabek. Large-scale incremental processing using distributed transactions and notifications. In OSDI 2010.
[28] R. Power and J. Li. Piccolo: Building fast, distributed programs with partitioned tables. In Proc. OSDI 2010,
2010.
[29] R. Ramakrishnan and J. Gehrke. Database Management Systems. McGraw-Hill, Inc., 3 edition, 2003.
[30] D. Spiewak and T. Zhao. ScalaQL: Language-integrated database queries for scala. In SLE, pages 154–163, 2009.
[31] K. Thomas, C. Grier, J. Ma, V. Paxson, and D. Song. Design and evaluation of a real-time URL spam filtering service. In IEEE Symposium on Security and Privacy, 2011.
[32] L. G. Valiant. A bridging model for parallel computation. Commun. ACM, 33:103–111, August 1990.
[33] J. W. Young. A first order approximation to the optimum checkpoint interval. Commun. ACM, 17:530–531, Sept 1974.
[34] Y. Yu, M. Isard, D. Fetterly, M. Budiu, U. Erlingsson, P. K. Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In OSDI ’08, 2008.

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點擊一鍵舉報,。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多