久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

Apache Spark源碼走讀之13

 openlog 2014-06-06

歡迎轉(zhuǎn)載,,轉(zhuǎn)載請(qǐng)注明出處,徽滬一郎

概要

在新近發(fā)布的spark 1.0中新加了sql的模塊,,更為引人注意的是對(duì)hive中的hiveql也提供了良好的支持,,作為一個(gè)源碼分析控,了解一下spark是如何完成對(duì)hql的支持是一件非常有趣的事情,。

Hive簡(jiǎn)介

Hive的由來(lái)

以下部分摘自Hadoop definite guide中的Hive一章

Hive由Facebook出品,,其設(shè)計(jì)之初目的是讓精通SQL技能的分析師能夠?qū)acebook存放在HDFS上的大規(guī)模數(shù)據(jù)集進(jìn)行分析和查詢。

Hive大大簡(jiǎn)化了對(duì)大規(guī)模數(shù)據(jù)集的分析門(mén)檻(不再要求分析人員具有很強(qiáng)的編程能力),,迅速流行起來(lái),,成為Hadoop生成圈上的Killer Application. 目前已經(jīng)有很多組織把Hive作為一個(gè)通用的,可伸縮數(shù)據(jù)處理平臺(tái),。

數(shù)據(jù)模型(Data Model)

Hive所有的數(shù)據(jù)都存在HDFS中,,在Hive中有以下幾種數(shù)據(jù)模型

  • Tables(表) table和關(guān)系型數(shù)據(jù)庫(kù)中的表是相對(duì)應(yīng)的,每個(gè)表都有一個(gè)對(duì)應(yīng)的hdfs目錄,,表中的數(shù)據(jù)經(jīng)序列化后存儲(chǔ)在該目錄,,Hive同時(shí)支持表中的數(shù)據(jù)存儲(chǔ)在其它類(lèi)型的文件系統(tǒng)中,如NFS或本地文件系統(tǒng)
  • 分區(qū)(Partitions) Hive中的分區(qū)起到的作用有點(diǎn)類(lèi)似于RDBMS中的索引功能,,每個(gè)Partition都有一個(gè)對(duì)應(yīng)的目錄,,這樣在查詢的時(shí)候,可以減少數(shù)據(jù)規(guī)模
  • 桶(buckets) 即使將數(shù)據(jù)按分區(qū)之后,,每個(gè)分區(qū)的規(guī)模有可能還是很大,,這個(gè)時(shí)候,,按照關(guān)鍵字的hash結(jié)果將數(shù)據(jù)分成多個(gè)buckets,每個(gè)bucket對(duì)應(yīng)于一個(gè)文件

Query Language

 HiveQL是Hive支持的類(lèi)似于SQL的查詢語(yǔ)言,。HiveQL大體可以分成下面兩種類(lèi)型

  1. DDL(data definition language)  比如創(chuàng)建數(shù)據(jù)庫(kù)(create database),創(chuàng)建表(create table),數(shù)據(jù)庫(kù)和表的刪除
  2. DML(data manipulation language) 數(shù)據(jù)的添加,,查詢
  3. UDF(user defined function) Hive還支持用戶自定義查詢函數(shù)

Hive architecture

hive的整體框架圖如下圖所示

 

由上圖可以看出,Hive的整體架構(gòu)可以分成以下幾大部分

  1. 用戶接口  支持CLI, JDBC和Web UI
  2. Driver Driver負(fù)責(zé)將用戶指令翻譯轉(zhuǎn)換成為相應(yīng)的MapReduce Job
  3. MetaStore 元數(shù)據(jù)存儲(chǔ)倉(cāng)庫(kù),,像數(shù)據(jù)庫(kù)和表的定義這些內(nèi)容就屬于元數(shù)據(jù)這個(gè)范疇,,默認(rèn)使用的是Derby存儲(chǔ)引擎

HiveQL執(zhí)行過(guò)程

HiveQL的執(zhí)行過(guò)程如下所述

  1. parser 將HiveQL解析為相應(yīng)的語(yǔ)法樹(shù)
  2. Semantic Analyser 語(yǔ)義分析
  3. Logical Plan Generating 生成相應(yīng)的LogicalPlan
  4. Query Plan Generating
  5. Optimizer

最終生成MapReduce的Job,交付給Hadoop的MapReduce計(jì)算框架具體運(yùn)行,。

Hive實(shí)例

最好的學(xué)習(xí)就是實(shí)戰(zhàn),,Hive這一小節(jié)還是以一個(gè)具體的例子來(lái)結(jié)束吧。

前提條件是已經(jīng)安裝好hadoop,,具體安裝可以參考源碼走讀11或走讀9

step 1: 創(chuàng)建warehouse

warehouse用來(lái)存儲(chǔ)raw data

$ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse

step 2: 啟動(dòng)hive cli

$ export HIVE_HOME=<hive-install-dir>
$ $HIVE_HOME/bin/hive

step 3: 創(chuàng)建表

創(chuàng)建表,,首先將schema數(shù)據(jù)寫(xiě)入到metastore,另一件事情就是在warehouse目錄下創(chuàng)建相應(yīng)的子目錄,該子目錄以表的名稱(chēng)命名

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

step 4: 導(dǎo)入數(shù)據(jù)

導(dǎo)入的數(shù)據(jù)會(huì)存儲(chǔ)在step 3中創(chuàng)建的表目錄下

LOAD DATA LOCAL INPATH '/u.data'
OVERWRITE INTO TABLE u_data;

step 5: 查詢

SELECT COUNT(*) FROM u_data;

 hiveql on Spark

Q: 上一章節(jié)花了大量的篇幅介紹了hive由來(lái),,框架及hiveql執(zhí)行過(guò)程,。那這些東西跟我們標(biāo)題中所稱(chēng)的hive on spark有什么關(guān)系呢?

Ans:  Hive的整體解決方案很不錯(cuò),,但有一些地方還值得改進(jìn),,其中之一就是“從查詢提交到結(jié)果返回需要相當(dāng)長(zhǎng)的時(shí)間,查詢耗時(shí)太長(zhǎng),。之所以查詢時(shí)間很長(zhǎng),,一個(gè)主要的原因就是因?yàn)镠ive原生是基于MapReduce的,哪有沒(méi)有辦法提高呢,。您一定想到了,,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速執(zhí)行能力來(lái)縮短HiveQl的響應(yīng)時(shí)間,。

下圖是Spark 1.0中所支持的lib庫(kù),,SQL是其唯一新添加的lib庫(kù),可見(jiàn)SQL在Spark 1.0中的地位之重要,。

 

HiveContext

HiveContext是Spark提供的用戶接口,,HiveContext繼承自SqlContext。

讓我們回顧一下,,SqlContext中牽涉到的類(lèi)及其間的關(guān)系如下圖所示,具體分析過(guò)程參見(jiàn)本系列中的源碼走讀之11,。

既然是繼承自SqlContext,,那么我們將普通sql與hiveql分析執(zhí)行步驟做一個(gè)對(duì)比,,可以得到下圖。

 

有了上述的比較,,就能抓住源碼分析時(shí)需要把握的幾個(gè)關(guān)鍵點(diǎn)

  1. Entrypoint           HiveContext.scala
  2. QueryExecution    HiveContext.scala
    1. parser       HiveQl.scala
    2. optimizer    

數(shù)據(jù)

使用到的數(shù)據(jù)有兩種

  1. Schema Data  像數(shù)據(jù)庫(kù)的定義和表的結(jié)構(gòu),,這些都存儲(chǔ)在MetaStore中
  2. Raw data        即要分析的文件本身

Entrypoint

hiveql是整個(gè)的入口點(diǎn),,而hql是hiveql的縮寫(xiě)形式。

  def hiveql(hqlQuery: String): SchemaRDD = {
    val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but does not perform any execution.
    result.queryExecution.toRdd
    result
  }

上述hiveql的定義與sql的定義幾乎一模一樣,,唯一的不同是sql中使用parseSql的結(jié)果作為SchemaRDD的入?yún)⒍鴋iveql中使用HiveQl.parseSql作為SchemaRdd的入?yún)?/p>

HiveQL, parser

parseSql的函數(shù)定義如代碼所示,,解析過(guò)程中將指令分成兩大類(lèi)

  • nativecommand     非select語(yǔ)句,這類(lèi)語(yǔ)句的特點(diǎn)是執(zhí)行時(shí)間不會(huì)因?yàn)闂l件的不同而有很大的差異,,基本上都能在較短的時(shí)間內(nèi)完成
  • 非nativecommand  主要是select語(yǔ)句
def parseSql(sql: String): LogicalPlan = {
    try {
      if (sql.toLowerCase.startsWith("set")) {
        NativeCommand(sql)
      } else if (sql.toLowerCase.startsWith("add jar")) {
        AddJar(sql.drop(8))
      } else if (sql.toLowerCase.startsWith("add file")) {
        AddFile(sql.drop(9))
      } else if (sql.startsWith("dfs")) {
        DfsCommand(sql)
      } else if (sql.startsWith("source")) {
        SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
      } else if (sql.startsWith("!")) {
        ShellCommand(sql.drop(1))
      } else {
        val tree = getAst(sql)

        if (nativeCommands contains tree.getText) {
          NativeCommand(sql)
        } else {
          nodeToPlan(tree) match {
            case NativePlaceholder => NativeCommand(sql)
            case other => other
          }
        }
      }
    } catch {
      case e: Exception => throw new ParseException(sql, e)
      case e: NotImplementedError => sys.error(
        s"""
          |Unsupported language features in query: $sql
          |${dumpTree(getAst(sql))}
        """.stripMargin)
    }
  }	

哪些指令是nativecommand呢,,答案在HiveQl.scala中的nativeCommands變量,列表很長(zhǎng),,代碼就不一一列出,。

對(duì)于非nativeCommand,最重要的解析函數(shù)就是nodeToPlan

toRdd

Spark對(duì)HiveQL所做的優(yōu)化主要體現(xiàn)在Query相關(guān)的操作,,其它的依然使用Hive的原生執(zhí)行引擎,。

在logicalPlan到physicalPlan的轉(zhuǎn)換過(guò)程中,toRdd最關(guān)鍵的元素

override lazy val toRdd: RDD[Row] =
      analyzed match {
        case NativeCommand(cmd) =>
          val output = runSqlHive(cmd)

          if (output.size == 0) {
            emptyResult
          } else {
            val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
            sparkContext.parallelize(asRows, 1)
          }
        case _ =>
          executedPlan.execute().map(_.copy())
      }

native command的執(zhí)行流程

由于native command是一些非耗時(shí)的操作,,直接使用Hive中原有的exeucte engine來(lái)執(zhí)行即可,。這些command的執(zhí)行示意圖如下

analyzer

HiveTypeCoercion

val typeCoercionRules =
    List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
      StringToIntegralCasts, FunctionArgumentConversion)		

optimizer

PreInsertionCasts存在的目的就是確保在數(shù)據(jù)插入執(zhí)行之前,相應(yīng)的表已經(jīng)存在,。

override lazy val optimizedPlan =
      optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))

此處要注意的是catalog的用途,,catalog是HiveMetastoreCatalog的實(shí)例。

HiveMetastoreCatalog是Spark中對(duì)Hive Metastore訪問(wèn)的wrapper,。HiveMetastoreCatalog通過(guò)調(diào)用相應(yīng)的Hive Api可以獲得數(shù)據(jù)庫(kù)中的表及表的分區(qū),,也可以創(chuàng)建新的表和分區(qū)。

HiveMetastoreCatalog

HiveMetastoreCatalog中會(huì)通過(guò)hive client來(lái)訪問(wèn)metastore中的元數(shù)據(jù),,使用了大量的Hive Api,。其中包括了廣為人知的deSer library。

以CreateTable函數(shù)為例說(shuō)明對(duì)Hive Library的依賴(lài),。

def createTable(
      databaseName: String,
      tableName: String,
      schema: Seq[Attribute],
      allowExisting: Boolean = false): Unit = {
    val table = new Table(databaseName, tableName)
    val hiveSchema =
      schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
    table.setFields(hiveSchema)

    val sd = new StorageDescriptor()
    table.getTTable.setSd(sd)
    sd.setCols(hiveSchema)

    // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
    sd.setCompressed(false)
    sd.setParameters(Map[String, String]())
    sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
    sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
    val serDeInfo = new SerDeInfo()
    serDeInfo.setName(tableName)
    serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
    serDeInfo.setParameters(Map[String, String]())
    sd.setSerdeInfo(serDeInfo)

    try client.createTable(table) catch {
      case e: org.apache.hadoop.hive.ql.metadata.HiveException
        if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
           allowExisting => // Do nothing.
    }
  }

實(shí)驗(yàn)

結(jié)合源碼,,我們?cè)賹?duì)一個(gè)簡(jiǎn)單的例子作下說(shuō)明。

可能你會(huì)想,,既然spark也支持hql,,那么我原先用hive cli創(chuàng)建的數(shù)據(jù)庫(kù)和表用spark能不能訪問(wèn)到呢?答案或許會(huì)讓你很納悶,,“在默認(rèn)的配置下是不行的”,。為什么?

Hive中的meta data采用的存儲(chǔ)引擎是Derby,,該存儲(chǔ)引擎只能有一個(gè)訪問(wèn)用戶,。同一時(shí)刻只能有一個(gè)人訪問(wèn),即便以同一用戶登錄訪問(wèn)也不行。針對(duì)這個(gè)局限,,解決方法就是將metastore存儲(chǔ)在mysql或者其它可以多用戶訪問(wèn)的數(shù)據(jù)庫(kù)中,。

具體實(shí)例

  1. 創(chuàng)建表
  2. 導(dǎo)入數(shù)據(jù)
  3. 查詢
  4. 刪除表

在啟動(dòng)spark-shell之前,需要先設(shè)置環(huán)境變量HIVE_HOMEHADOOP_HOME.

啟動(dòng)spark-shell之后,,執(zhí)行如下代碼

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hql("FROM src SELECT key, value").collect().foreach(println)
hql("drop table src")

create操作會(huì)在/user/hive/warehouse/目錄下創(chuàng)建src目錄,,可以用以下指令來(lái)驗(yàn)證

$$HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/

 drop表的時(shí)候,不僅metastore中相應(yīng)的記錄被刪除,,而且原始數(shù)據(jù)raw file本身也會(huì)被刪除,,即在warehouse目錄下對(duì)應(yīng)某個(gè)表的目錄會(huì)被整體刪除掉。

上述的create, load及query操作對(duì)metastore和raw data的影響可以用下圖的表示

hive-site.xml

如果想對(duì)hive默認(rèn)的配置作修改,,可以使用hive-site.xml,。

具體步驟如下

 -  在$SPARK_HOME/conf目錄下創(chuàng)建hive-site.xml

 -  根據(jù)需要,添寫(xiě)相應(yīng)的配置項(xiàng)的值,,可以這樣做,,將$HIVE_HOME/conf目錄下的hive-default.xml復(fù)制到$SPARK_HOME/conf,然后重命名為hive-site.xml

Sql新功能預(yù)告

為了進(jìn)一步提升sql的執(zhí)行速度,,在Spark開(kāi)發(fā)團(tuán)隊(duì)在發(fā)布完1.0之后,,會(huì)通過(guò)codegen的方法來(lái)提升執(zhí)行速度。codegen有點(diǎn)類(lèi)似于jvm中的jit技術(shù),。充分利用了scala語(yǔ)言的特性,。

前景分析

Spark目前還缺乏一個(gè)非常有影響力的應(yīng)用,也就通常所說(shuō)的killer application,。SQL是Spark在尋找killer application方面所做的一個(gè)積極嘗試,,也是目前Spark上最有熱度的一個(gè)話題,但通過(guò)優(yōu)化Hive執(zhí)行速度來(lái)吸引潛在Spark用戶,,該突破方向選擇正確與否還有待市場(chǎng)證明,。

Hive除了在執(zhí)行速度上為人詬病之外,還有一個(gè)最大的問(wèn)題就是多用戶訪問(wèn)的問(wèn)題,,相較第一個(gè)問(wèn)題,,第二個(gè)問(wèn)題來(lái)得更為致命。無(wú)論是Facebook在Hive之后推出的Presto還是Cloudera推出的Impala都是針對(duì)第二問(wèn)題提出的解決方案,,目前都已經(jīng)取得的了巨大優(yōu)勢(shì),。

小結(jié)

本文就Spark對(duì)HiveQL提供支持的這一功能進(jìn)行了比較詳細(xì)的分析,其中涉及到以下幾個(gè)問(wèn)題,。

  1. 什么是hive
  2. hive有什么缺點(diǎn),,否則就沒(méi)Spark或Shark啥事了
  3. Spark主要是針對(duì)hive的哪個(gè)不足做出改進(jìn)
  4. Spark是如何對(duì)這個(gè)做改進(jìn)的

參考資料

  1. programming hive
  2. Shark vs. Impala
  3. Hive Design

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類(lèi)似文章 更多