在 IBM Bluemix 云平臺(tái)上開發(fā)并部署您的下一個(gè)應(yīng)用,。 引言使用機(jī)器學(xué)習(xí) (Machine Learning) 技術(shù)和方法來(lái)解決實(shí)際問題,已經(jīng)被成功應(yīng)用到多個(gè)領(lǐng)域,,我們經(jīng)常能夠看到的實(shí)例有個(gè)性推薦系統(tǒng),,金融反欺詐,自然語(yǔ)言處理和機(jī)器翻譯,,模式識(shí)別,智能控制等,。一個(gè)典型的機(jī)器學(xué)習(xí)機(jī)器學(xué)習(xí)過程通常會(huì)包含:源數(shù)據(jù) ETL,數(shù)據(jù)預(yù)處理,,指標(biāo)提取,模型訓(xùn)練與交叉驗(yàn)證,,新數(shù)據(jù)預(yù)測(cè)等,。我們可以看到這是一個(gè)包含多個(gè)步驟的流水線式工作,,也就是說(shuō)數(shù)據(jù)從收集開始,要經(jīng)歷多個(gè)步驟,,才能得到我們需要的輸出。在本系列第 4 部分已經(jīng)向大家介紹了 Spark MLlib 機(jī)器學(xué)習(xí)庫(kù), 雖然 MLlib 已經(jīng)足夠簡(jiǎn)單易用,,但是如果目標(biāo)數(shù)據(jù)集結(jié)構(gòu)復(fù)雜需要多次處理,或者是對(duì)新數(shù)據(jù)進(jìn)行預(yù)測(cè)的時(shí)候需要結(jié)合多個(gè)已經(jīng)訓(xùn)練好的單個(gè)模型進(jìn)行綜合預(yù)測(cè) (集成學(xué)習(xí)的思想),,那么使用 MLlib 將會(huì)讓程序結(jié)構(gòu)復(fù)雜,難于理解和實(shí)現(xiàn),。值得慶幸的是,在 Spark 的生態(tài)系統(tǒng)里,,一個(gè)可以用于構(gòu)建復(fù)雜機(jī)器學(xué)習(xí)工作流應(yīng)用的新庫(kù)已經(jīng)出現(xiàn)了,,它就是 Spark 1.2 版本之后引入的 ML Pipeline,經(jīng)過幾個(gè)版本的發(fā)展,,截止目前的 1.5.1 版本已經(jīng)變得足夠穩(wěn)定易用了。本文將向讀者詳細(xì)地介紹 Spark ML Pipeline 的設(shè)計(jì)思想和基本概念,,以及如何使用 ML Pipeline 提供的 API 庫(kù)編寫一個(gè)解決分類預(yù)測(cè)問題的 Pipeline 式應(yīng)用程序。相信通過本文的學(xué)習(xí),,讀者可以較為深入的理解 ML Pipeline,進(jìn)而將它推廣和應(yīng)用到更多復(fù)雜問題的解決方案上去,。 關(guān)于 ML PipelineSpark ML Pipeline 的出現(xiàn),,是受到了 scikit-learn 項(xiàng)目的啟發(fā),并且總結(jié)了 MLlib 在處理復(fù)雜機(jī)器學(xué)習(xí)問題上的弊端,,旨在向用戶提供基于 DataFrame 之上的更加高層次的 API 庫(kù),以更加方便的構(gòu)建復(fù)雜的機(jī)器學(xué)習(xí)工作流式應(yīng)用,。一個(gè) Pipeline 在結(jié)構(gòu)上會(huì)包含一個(gè)或多個(gè) PipelineStage,每一個(gè) PipelineStage 都會(huì)完成一個(gè)任務(wù),,如數(shù)據(jù)集處理轉(zhuǎn)化,模型訓(xùn)練,,參數(shù)設(shè)置或數(shù)據(jù)預(yù)測(cè)等,這樣的 PipelineStage 在 ML 里按照處理問題類型的不同都有相應(yīng)的定義和實(shí)現(xiàn),。接下來(lái),,我們先來(lái)了解幾個(gè)重要概念,。
關(guān)于 DataFrame 其實(shí)我們已經(jīng)在本系列第 3 部分介紹過了,,它較之 RDD,,包含了 schema 信息,,更類似傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格,。它被 ML Pipeline 用來(lái)存儲(chǔ)源數(shù)據(jù)。 DataFrame 可以被用來(lái)保存各種類型的數(shù)據(jù),,如我們可以把特征向量存儲(chǔ)在 DataFrame 的一列中,這樣用起來(lái)是非常方便的,。
Transformer 中文可以被翻譯成轉(zhuǎn)換器,是一個(gè) PipelineStage,,實(shí)現(xiàn)上也是繼承自 PipelineStage 類,主要是用來(lái)把 一個(gè) DataFrame 轉(zhuǎn)換成另一個(gè) DataFrame,,比如一個(gè)模型就是一個(gè) Transformer,,因?yàn)樗梢园? 一個(gè)不包含預(yù)測(cè)標(biāo)簽的測(cè)試數(shù)據(jù)集 DataFrame 打上標(biāo)簽轉(zhuǎn)化成另一個(gè)包含預(yù)測(cè)標(biāo)簽的 DataFrame,顯然這樣的結(jié)果集可以被用來(lái)做分析結(jié)果的可視化,。
Estimator 中文可以被翻譯成評(píng)估器或適配器,,在 Pipeline 里通常是被用來(lái)操作 DataFrame 數(shù)據(jù)并生產(chǎn)一個(gè) Transformer,如一個(gè)隨機(jī)森林算法就是一個(gè) Estimator,,因?yàn)樗梢酝ㄟ^訓(xùn)練特征數(shù)據(jù)而得到一個(gè)隨機(jī)森林模型,。實(shí)現(xiàn)上 Estimator 也是繼承自 PipelineStage 類,。
Parameter 被用來(lái)設(shè)置 Transformer 或者 Estimator 的參數(shù)。 要構(gòu)建一個(gè) Pipeline,,首先我們需要定義 Pipeline 中的各個(gè) PipelineStage,如指標(biāo)提取和轉(zhuǎn)換模型訓(xùn)練等。有了這些處理特定問題的 Transformer 和 Estimator,,我們就可以按照具體的處理邏輯來(lái)有序的組織 PipelineStages 并創(chuàng)建一個(gè) Pipeline,如 val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…)),。然后就可以把訓(xùn)練數(shù)據(jù)集作為入?yún)⒉⒄{(diào)用 Pipelin 實(shí)例的 fit 方法來(lái)開始以流的方式來(lái)處理源訓(xùn)練數(shù)據(jù),這個(gè)調(diào)用會(huì)返回一個(gè) PipelineModel 類實(shí)例,,進(jìn)而被用來(lái)預(yù)測(cè)測(cè)試數(shù)據(jù)的標(biāo)簽,它是一個(gè) Transformer,。 隨機(jī)森林及 ML 的實(shí)現(xiàn)隨機(jī)森林構(gòu)建于決策樹之上,顧名思義,,就是隨機(jī)的構(gòu)建一個(gè)包含多個(gè)決策樹的森林。隨機(jī)森林里的決策樹之間是獨(dú)立的,,在隨機(jī)森林模型構(gòu)建好以后,對(duì)于新來(lái)的測(cè)試樣本數(shù)據(jù),,隨機(jī)森林模型會(huì)讓其中的每個(gè)決策樹分別做一次預(yù)測(cè),,然后統(tǒng)計(jì)出現(xiàn)此處最多的預(yù)測(cè)標(biāo)簽,并將它作為最終的預(yù)測(cè)標(biāo)簽,。隨機(jī)森林算法運(yùn)用的就是集成學(xué)習(xí)的思想,在實(shí)踐中,,隨機(jī)森林往往都有很好表現(xiàn),并且多次預(yù)測(cè)結(jié)果穩(wěn)定并且精度非常高,,也不容易出現(xiàn)過擬合的問題,。也是筆者最喜歡并且最常用的一種機(jī)器學(xué)習(xí)算法,。 本文并不會(huì)重點(diǎn)介紹隨機(jī)森林的基本理論,因?yàn)榫W(wǎng)上這樣的文章已經(jīng)很多了,,本文將把重點(diǎn)放在對(duì) Spark ML 中隨機(jī)森林的實(shí)現(xiàn)以及可調(diào)參數(shù)的介紹,。關(guān)于隨機(jī)森林算法的詳細(xì)介紹大家可以參考維基百科上的隨機(jī)森林介紹。 Spark ML 中隨機(jī)森林實(shí)現(xiàn)是在 RandomForestClassifier 類中,,位于 org.apache.spark.ml. classification 包中,該實(shí)現(xiàn)中支持設(shè)置的主要參數(shù)如下:
訓(xùn)練數(shù)據(jù)集 DataFrame 中存儲(chǔ)特征數(shù)據(jù)的列名,。
標(biāo)簽列的名稱。
樹節(jié)點(diǎn)選擇的不純度的衡量指標(biāo),,取值可以是”entroy”或“gini”, 默認(rèn)是”gini”。
離散連續(xù)性變量時(shí)最大的分箱數(shù),,默認(rèn)是 32,。理論上箱數(shù)越大粒度就越細(xì),,但是針對(duì)特定的數(shù)據(jù)集總有一個(gè)合理的箱數(shù)。
樹的最大深度,,默認(rèn)值是 5。
隨機(jī)森林需要訓(xùn)練的樹的個(gè)數(shù),,默認(rèn)值是 20。
算法預(yù)測(cè)結(jié)果的存儲(chǔ)列的名稱, 默認(rèn)是”prediction”,。
原始的算法預(yù)測(cè)結(jié)果的存儲(chǔ)列的名稱, 默認(rèn)是”rawPrediction”
類別預(yù)測(cè)結(jié)果的條件概率值存儲(chǔ)列的名稱, 默認(rèn)值是”probability” 在后文中大家可以看到如何在程序中設(shè)置這些參數(shù),??梢哉{(diào)用 RandomForestClassifier.setXXX 方法或者在 ParamMap 里設(shè)定參數(shù),然后再調(diào)用 RandomForestClassifier.fit 方法時(shí)傳入 ParamMap 實(shí)例,,如: RandomForestClassifier 的 fit 方法從源頭上來(lái)講,是來(lái)自 Predictor 類 (Estimator 的子類),,Predictor 類的 fit 方法設(shè)計(jì)和實(shí)現(xiàn)上實(shí)際上是采用了模板方法的設(shè)計(jì)模式,具體會(huì)調(diào)用實(shí)現(xiàn)類的 train 方法 圖 1. Predictor 類的 fit 方法實(shí)現(xiàn)預(yù)覽所以對(duì)于 RandomForestClassifier 類我們最需要關(guān)注的就是 train 方法,,其中包含具體從源數(shù)據(jù) DataFrame 訓(xùn)練一個(gè)隨機(jī)森林模型的過程,。train 方法在提取出 DataFrame 數(shù)據(jù)集中的 label 和 features 數(shù)據(jù)之后,進(jìn)一步調(diào)用 RandomForest.run 方法去真正的開始訓(xùn)練隨機(jī)森林模型,,訓(xùn)練結(jié)束后會(huì)返回一個(gè) RandomForestClassificationModel 類實(shí)例,這是一個(gè) Transformer,,會(huì)被用來(lái)預(yù)測(cè)測(cè)試數(shù)據(jù)集。 圖 2. RandomForestClassifier 類的 train 方法實(shí)現(xiàn)預(yù)覽對(duì)于 RandomForest 類的 run 方法的具體實(shí)現(xiàn)邏輯,,已經(jīng)在 developerWorks 的“Spark 隨機(jī)森林算法原理,、源碼分析及案例實(shí)戰(zhàn)” 一文中有詳細(xì)介紹,,為了避免內(nèi)容沖突,本文的內(nèi)容將重點(diǎn)放在 ML Pipeline 的實(shí)現(xiàn)層次關(guān)系上,,在這里不做贅述。 目標(biāo)數(shù)據(jù)集預(yù)覽本文所使用的測(cè)試數(shù)據(jù)集來(lái)自 UCI 的 banknote authentication data set ,,這是一個(gè)從紙幣鑒別過程中的圖片里提取的數(shù)據(jù)集,總共包含五個(gè)列,,前 4 列是指標(biāo)值 (連續(xù)型),,最后一列是真假標(biāo)識(shí),。 圖 3. 測(cè)試數(shù)據(jù)集格式四列依次是小波變換圖像的方差,,小波變換圖像的偏態(tài),小波變換圖像的峰度,,圖像熵,類別標(biāo)簽,。其實(shí)讀者并不需要知道什么是小波變換及其相關(guān)改變,只需要知道這是四個(gè)特征指標(biāo)的值,我們將根據(jù)這些指標(biāo)訓(xùn)練模型使用模型預(yù)測(cè)類別,。對(duì)于該數(shù)據(jù)集的更多信息,讀者可以參考 UCI 官網(wǎng)的描述,。 案例分析與編碼實(shí)現(xiàn)前面提到,,本文的目的是使用 Spark ML Pipeline 構(gòu)建一個(gè)對(duì)目標(biāo)數(shù)據(jù)集進(jìn)行分類預(yù)測(cè)的機(jī)器學(xué)習(xí)工作流,,案例背景已經(jīng)相當(dāng)清晰,,在了解了數(shù)據(jù)集本身和 ML Pipeline 的相關(guān)知識(shí)后,,接下來(lái)就是編程實(shí)現(xiàn)了。關(guān)于實(shí)現(xiàn)基本思路和關(guān)鍵的 11 個(gè)步驟筆者已經(jīng)在代碼中做了詳細(xì)解釋,,為了方便讀者理解,這里特別的把該實(shí)例的 Pipeline 里包含的 4 個(gè) Stage 重點(diǎn)介紹下,。 這四個(gè) Stage 分別對(duì)應(yīng)代碼注釋里的步驟 2-5,,作用如下: 第一個(gè),,使用 StringIndexer 去把源數(shù)據(jù)里的字符 Label,按照 Label 出現(xiàn)的頻次對(duì)其進(jìn)行序列編碼, 如,,0,1,2,…,。在本例的數(shù)據(jù)中,可能這個(gè)步驟的作用不甚明顯,,因?yàn)槲覀兊臄?shù)據(jù)格式良好,,Label 本身也只有兩種,,并且已經(jīng)是類序列編碼的”0”和”1”格式。但是對(duì)于多分類問題或者是 Label 本身是字符串的編碼方式,,如”High”,”Low”,”Medium”等,,那么這個(gè)步驟就很有用,轉(zhuǎn)換后的格式,,才能被 Spark 更好的處理。 第二個(gè),,使用 VectorAssembler 從源數(shù)據(jù)中提取特征指標(biāo)數(shù)據(jù),這是一個(gè)比較典型且通用的步驟,,因?yàn)槲覀兊脑紨?shù)據(jù)集里,,經(jīng)常會(huì)包含一些非指標(biāo)數(shù)據(jù),,如 ID,Description 等,。 第三個(gè),創(chuàng)建一個(gè)隨機(jī)森林分類器 RandomForestClassifier 實(shí)例,,并設(shè)定相關(guān)參數(shù),,主要是告訴隨機(jī)森林算法輸入 DataFrame 數(shù)據(jù)里哪個(gè)列是特征向量,,哪個(gè)是類別標(biāo)識(shí),并告訴隨機(jī)森林分類器訓(xùn)練 5 棵獨(dú)立的子樹,。 第四個(gè),我們使用 IndexToString Transformer 去把之前的序列編碼后的 Label 轉(zhuǎn)化成原始的 Label,,恢復(fù)之前的可讀性比較高的 Label,這樣不論是存儲(chǔ)還是顯示模型的測(cè)試結(jié)果,,可讀性都會(huì)比較高,。 這幾個(gè) Stage 都會(huì)被用來(lái)構(gòu)建 Pipeline 實(shí)例,,并且會(huì)按照順序執(zhí)行,最終我們根據(jù)得到的 PipelineModel 實(shí)例,,進(jìn)一步調(diào)用其 transform 方法,,去用訓(xùn)練好的模型預(yù)測(cè)測(cè)試數(shù)據(jù)集的分類,。 清單 1. 示例程序源代碼import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification._ import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object ClassificationPipeline { def main(args: Array[String]) { if (args.length < 1){ println("Usage:ClassificationPipeline inputDataFile") sys.exit(1) } val conf = new SparkConf().setAppName("Classification with ML Pipeline") val sc = new SparkContext(conf) val sqlCtx = new SQLContext(sc) /** Step 1 * Read the source data file and convert it to be a dataframe with columns named. * 3.6216,8.6661,-2.8073,-0.44699,0 * 4.5459,8.1674,-2.4586,-1.4621,0 * 3.866,-2.6383,1.9242,0.10645,0 * 3.4566,9.5228,-4.0112,-3.5944,0 * 0.32924,-4.4552,4.5718,-0.9888,0 * ... ... */ val parsedRDD = sc.textFile(args(0)).map(_.split(",")).map(eachRow => { val a = eachRow.map(x => x.toDouble) (a(0),a(1),a(2),a(3),a(4)) }) val df = sqlCtx.createDataFrame(parsedRDD).toDF( "f0","f1","f2","f3","label").cache() /** * * Step 2 * StringIndexer encodes a string column of labels * to a column of label indices. The indices are in [0, numLabels), * ordered by label frequencies. * This can help detect label in raw data and give it an index automatically. * So that it can be easily processed by existing spark machine learning algorithms. * */ val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(df) /** * Step 3 * Define a VectorAssembler transformer to transform source features data to be a vector * This is helpful when raw input data contains non-feature columns, and it is common for * such a input data file to contain columns such as "ID", "Date", etc. */ val vectorAssembler = new VectorAssembler() .setInputCols(Array("f0","f1","f2","f3")) .setOutputCol("featureVector") /** * Step 4 * Create RandomForestClassifier instance and set the input parameters. * Here we will use 5 trees Random Forest to train on input data. */ val rfClassifier = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("featureVector") .setNumTrees(5) /** * Step 5 * Convert indexed class labels back to original one so that it can be easily understood when we * need to display or save the prediction result to a file. */ val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) //Step 6 //Randomly split the input data by 8:2, while 80% is for training, the rest is for testing. val Array(trainingData, testData) = df.randomSplit(Array(0.8, 0.2)) /** * Step 7 * Create a ML pipeline which is constructed by for 4 PipelineStage objects. * and then call fit method to perform defined operations on training data. */ val pipeline = new Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter)) val model = pipeline.fit(trainingData) /** *Step 8 *Perform predictions about testing data. This transform method will return a result DataFrame *with new prediction column appended towards previous DataFrame. * * */ val predictionResultDF = model.transform(testData) /** * Step 9 * Select features,label,and predicted label from the DataFrame to display. * We only show 20 rows, it is just for reference. */ predictionResultDF.select("f0","f1","f2","f3","label","predictedLabel").show(20) /** * Step 10 * The evaluator code is used to compute the prediction accuracy, this is * usually a valuable feature to estimate prediction accuracy the trained model. */ val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("precision") val predictionAccuracy = evaluator.evaluate(predictionResultDF) println("Testing Error = " + (1.0 - predictionAccuracy)) /** * Step 11(Optional) * You can choose to print or save the the model structure. */ val randomForestModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] println("Trained Random Forest Model is:\n" + randomForestModel.toDebugString) } } 運(yùn)行示例程序在運(yùn)行程序之前,,讀者需要把目標(biāo)數(shù)據(jù)集上傳至你的 HDFS 上,,并把測(cè)試程序打成 jar 包,。 清單 2. 示例程序運(yùn)行命令及參數(shù)示例./spark-submit --class com.ibm.spark.exercise.ml.ClassificationPipeline \ 當(dāng)然如果你想采用“yarn-cluster”或者”yarn-client”的方式運(yùn)行,,運(yùn)行命令會(huì)有稍許不同,并且根據(jù)你的集群狀況的不同,,可能命令也會(huì)發(fā)生稍許變化,。 圖 4. 示例程序運(yùn)行結(jié)果預(yù)覽 (部分)從運(yùn)行結(jié)果的預(yù)測(cè)錯(cuò)誤來(lái)看,,預(yù)測(cè)正確率基本可以達(dá)到近 98%,當(dāng)然你可以按 9:1 去劃分訓(xùn)練和測(cè)試數(shù)據(jù)集,,這樣可以得到更好的精確度。由此看來(lái),,隨機(jī)森林算法的預(yù)測(cè)精度確實(shí)非常的好,當(dāng)然這也取決于該組數(shù)據(jù)的模式比較明顯,,特征數(shù)據(jù)質(zhì)量比較好,。 注意事項(xiàng)
總結(jié)本文向讀者較為詳細(xì)的介紹了 ML Pipeline 的基本概念和編程實(shí)現(xiàn)步驟,大家可以看到,,較之 MLlib,,ML Pipeline 在結(jié)構(gòu)和邏輯層次上確實(shí)是更加清晰了,。但是我認(rèn)為 MLlib 對(duì)于處理結(jié)構(gòu)相對(duì)簡(jiǎn)單的數(shù)據(jù)集其實(shí)依然具有優(yōu)勢(shì),可能剛開始更容易被理解接受,。另外從 Spark 的學(xué)習(xí)曲線上來(lái)講,剛開始大家接觸的都是 RDD,,對(duì) DataFrame 不甚了解,,所以對(duì)于初學(xué)者對(duì) MLlib 的方式其實(shí)更容易接受,。所以,應(yīng)該說(shuō) MLlib 和 ML Pipeline 都有各自的優(yōu)勢(shì)吧,。當(dāng)然,這更多是我個(gè)人的理解,。希望這篇文章可以對(duì)大家學(xué)習(xí) ML Pipeline 有幫助,,在閱讀過程中,,有任何不懂或者發(fā)現(xiàn)任何問題,,請(qǐng)留下您的評(píng)論,我會(huì)第一時(shí)間回答,,這樣也是一個(gè)交流學(xué)習(xí)的過程,,非常感謝。 |
|
來(lái)自: 昵稱16883405 > 《Spark》