一,、關(guān)于spark ml pipeline與機(jī)器學(xué)習(xí)
一個(gè)典型的機(jī)器學(xué)習(xí)構(gòu)建包含若干個(gè)過程
1、源數(shù)據(jù)ETL
2,、數(shù)據(jù)預(yù)處理
3,、特征選取
4、模型訓(xùn)練與驗(yàn)證
以上四個(gè)步驟可以抽象為一個(gè)包括多個(gè)步驟的流水線式工作,,從數(shù)據(jù)收集開始至輸出我們需要的最終結(jié)果,。因此,對(duì)以上多個(gè)步驟,、進(jìn)行抽象建模,,簡(jiǎn)化為流水線式工作流程則存在著可行性,對(duì)利用spark進(jìn)行機(jī)器學(xué)習(xí)的用戶來說,,流水線式機(jī)器學(xué)習(xí)比單個(gè)步驟獨(dú)立建模更加高效,、易用。
受 scikit-learn 項(xiàng)目的啟發(fā),并且總結(jié)了MLlib在處理復(fù)雜機(jī)器學(xué)習(xí)問題的弊端(主要為工作繁雜,,流程不清晰),,旨在向用戶提供基于DataFrame
之上的更加高層次的 API 庫,以更加方便的構(gòu)建復(fù)雜的機(jī)器學(xué)習(xí)工作流式應(yīng)用,。一個(gè)pipeline 在結(jié)構(gòu)上會(huì)包含一個(gè)或多個(gè)Stage,,每一個(gè)
Stage 都會(huì)完成一個(gè)任務(wù),如數(shù)據(jù)集處理轉(zhuǎn)化,,模型訓(xùn)練,,參數(shù)設(shè)置或數(shù)據(jù)預(yù)測(cè)等,這樣的Stage 在 ML 里按照處理問題類型的不同都有相應(yīng)的定義和實(shí)現(xiàn),。兩個(gè)主要的stage為Transformer和Estimator,。Transformer主要是用來操作一個(gè)DataFrame
數(shù)據(jù)并生成另外一個(gè)DataFrame 數(shù)據(jù),比如svm模型,、一個(gè)特征提取工具,,都可以抽象為一個(gè)Transformer。Estimator
則主要是用來做模型擬合用的,,用來生成一個(gè)Transformer,。可能這樣說比較難以理解,,下面就以一個(gè)完整的機(jī)器學(xué)習(xí)案例來說明spark ml pipeline是怎么構(gòu)建機(jī)器學(xué)習(xí)工作流的,。
二、使用spark
ml pipeline構(gòu)建機(jī)器學(xué)習(xí)工作流
在此以Kaggle數(shù)據(jù)競(jìng)賽Display Advertising Challenge的數(shù)據(jù)集(該數(shù)據(jù)集為利用用戶特征進(jìn)行廣告點(diǎn)擊預(yù)測(cè))開始,,利用spark ml pipeline構(gòu)建一個(gè)完整的機(jī)器學(xué)習(xí)工作流程,。
首先,,讀入樣本集,并將樣本集劃分為訓(xùn)練集與測(cè)試集:
- //使用file標(biāo)記文件路徑,,允許spark讀取本地文件
- String fileReadPath = "file:\\D:\\dac_sample\\dac_sample.txt";
- //使用textFile讀入數(shù)據(jù)
- SparkContext sc = Contexts.sparkContext;
- RDD<String> file = sc.textFile(fileReadPath,1);
- JavaRDD<String> sparkContent = file.toJavaRDD();
- JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
- public Row call(String string) {
- String tempStr = string.replace("\t",",");
- String[] features = tempStr.split(",");
- int intLable= Integer.parseInt(features[0]);
- String intFeature1 = features[1];
- String intFeature2 = features[2]; String CatFeature1 = features[14];
- String CatFeature2 = features[15];
- return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
- }
- });
-
-
- double[] weights = {0.8, 0.2};
- Long seed = 42L;
- JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);
得到樣本集后,構(gòu)建出 DataFrame格式的數(shù)據(jù)供spark ml pipeline使用:
- List<StructField> fields = new ArrayList<StructField>();
- fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
- fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
- fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
- //and so on
-
-
- StructType schema = DataTypes.createStructType(fields);
- DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//訓(xùn)練數(shù)據(jù)
- dfTrain.registerTempTable("tmpTable1");
- DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//測(cè)試數(shù)據(jù)
- dfTest.registerTempTable("tmpTable2");
由于在dfTrain,、dfTest中所有的特征目前都為string類型,,而機(jī)器學(xué)習(xí)則要求其特征為numerical類型,在此需要對(duì)特征做轉(zhuǎn)換,,包括類型轉(zhuǎn)換和缺失值的處理,。
首先,將intFeature由string轉(zhuǎn)為double,cast()方法將表中指定列string類型轉(zhuǎn)換為double類型,,并生成新列并命名為intFeature1Temp,,
之后,需要刪除原來的數(shù)據(jù)列 并將新列重命名為intFeature1,,這樣,,就將string類型的特征轉(zhuǎn)換得到double類型的特征了。
- //Cast integer features from String to Double
- dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
- dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");
如果intFeature特征是年齡或者特征等類型,,則需要進(jìn)行分箱操作,,將一個(gè)特征按照指定范圍進(jìn)行劃分:
- /*特征轉(zhuǎn)換,部分特征需要進(jìn)行分箱,,比如年齡,,進(jìn)行分段成成年未成年等 */
- double[] splitV = {0.0,16.0,Double.MAX_VALUE};
- Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);
再次,需要將categorical 類型的特征轉(zhuǎn)換為numerical類型,。主要包括兩個(gè)步驟,,缺失值處理和編碼轉(zhuǎn)換。
缺失值處理方面,,可以使用全局的NA來統(tǒng)一標(biāo)記缺失值:
- /*將categoricalb類型的變量的缺失值使用NA值填充*/
- String[] strCols = {"CatFeature1","CatFeature2"};
- dfTrain = dfTrain.na().fill("NA",strCols);
- dfTest = dfTest.na().fill("NA",strCols);
缺失值處理完成之后,,就可以正式的對(duì)categorical類型的特征進(jìn)行numerical轉(zhuǎn)換了。在spark
ml中,,可以借助StringIndexer和oneHotEncoder完成
這一任務(wù):
- // StringIndexer oneHotEncoder 將 categorical變量轉(zhuǎn)換為 numerical 變量
- // 如某列特征為星期幾,、天氣等等特征,則轉(zhuǎn)換為七個(gè)0-1特征
- StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
- OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
- StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
- OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");
至此,,特征預(yù)處理步驟基本完成了,。由于上述特征都是處于單獨(dú)的列并且列名獨(dú)立,為方便后續(xù)模型進(jìn)行特征輸入,,需要將其轉(zhuǎn)換為特征向量,,并統(tǒng)一命名,
可以使用VectorAssembler類完成這一任務(wù):
- /*轉(zhuǎn)換為特征向量*/
- String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
- VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");
通常,,預(yù)處理之后獲得的特征有成千上萬維,,出于去除冗余特征、消除維數(shù)災(zāi)難,、提高模型質(zhì)量的考慮,,需要進(jìn)行選擇。在此,,使用卡方檢驗(yàn)方法,,
利用特征與類標(biāo)簽之間的相關(guān)性,進(jìn)行特征選?。?/span>
- /*特征較多時(shí),,使用卡方檢驗(yàn)進(jìn)行特征選擇,主要是考察特征與類標(biāo)簽的相關(guān)性*/
- ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
- .setOutputCol("selectedFeature");
在特征預(yù)處理和特征選取完成之后,就可以定義模型及其參數(shù)了,。簡(jiǎn)單期間,,在此使用LogisticRegression模型,并設(shè)定最大迭代次數(shù),、正則化項(xiàng):
- /* 設(shè)置最大迭代次數(shù)和正則化參數(shù) setElasticNetParam=0.0 為L(zhǎng)2正則化 setElasticNetParam=1.0為L(zhǎng)1正則化*/
- /*設(shè)置特征向量的列名,,標(biāo)簽的列名*/
- LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
- .setFeaturesCol("selectedFeature").setLabelCol("lable");
在上述準(zhǔn)備步驟完成之后,就可以開始定義pipeline并進(jìn)行模型的學(xué)習(xí)了:
- /*將特征轉(zhuǎn)換,,特征聚合,,模型等組成一個(gè)管道,并調(diào)用它的fit方法擬合出模型*/
- PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
- Pipeline pipline = new Pipeline().setStages(pipelineStage);
- PipelineModel pModle = pipline.fit(dfTrain);
上面pipeline的fit方法得到的是一個(gè)Transformer,,我們可以使它作用于訓(xùn)練集得到模型在訓(xùn)練集上的預(yù)測(cè)結(jié)果:
- //擬合得到模型的transform方法進(jìn)行預(yù)測(cè)
- DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
- DataFrame prediction = output.select("label", "prediction");
- prediction.show();
分析計(jì)算,,得到模型在訓(xùn)練集上的準(zhǔn)確率,看看模型的效果怎么樣:
- /*測(cè)試集合上的準(zhǔn)確率*/
- long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("'prediction"))).count();
- long total = prediction.count();
- double accuracy = correct / (double)total;
-
- System.out.println(accuracy);
最后,,可以將模型保存下來,,下次直接使用就可以了:
- String pModlePath = ""file:\\D:\\dac_sample\\";
- pModle.save(pModlePath);
三,梳理和總結(jié):
上述,,借助代碼實(shí)現(xiàn)了基于spark ml pipeline的機(jī)器學(xué)習(xí),,包括數(shù)據(jù)轉(zhuǎn)換、特征生成,、特征選取,、模型定義及模型學(xué)習(xí)等多個(gè)stage,得到的pipeline
模型后,,就可以在新的數(shù)據(jù)集上進(jìn)行預(yù)測(cè),,總結(jié)為兩部分并用流程圖表示如下:
訓(xùn)練階段:
預(yù)測(cè)階段:
借助于Pepeline,在spark上進(jìn)行機(jī)器學(xué)習(xí)的數(shù)據(jù)流向更加清晰,,同時(shí)每一stage的任務(wù)也更加明了,,因此,無論是在模型的預(yù)測(cè)使用上,、還是
模型后續(xù)的改進(jìn)優(yōu)化上,,都變得更加容易。
|