使用Spark SQL中的內(nèi)置函數(shù)對數(shù)據(jù)進(jìn)行分析,Spark SQL API不同的是,,DataFrame中的內(nèi)置函數(shù)操作的結(jié)果是返回一個Column對象,,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數(shù)據(jù)的復(fù)雜分析建立了堅(jiān)實(shí)的基礎(chǔ)并提供了極大的方便性,例如說,,我們在操作DataFrame的方法中可以隨時調(diào)用內(nèi)置函數(shù)進(jìn)行業(yè)務(wù)需要的處理,,這之于我們構(gòu)建附件的業(yè)務(wù)邏輯而言是可以極大的減少不必須的時間消耗(基于上就是實(shí)際模型的映射),讓我們聚焦在數(shù)據(jù)分析上,,
在Spark 1.5.x版本,,增加了一系列內(nèi)置函數(shù)到DataFrame API中,并且實(shí)現(xiàn)了code-generation的優(yōu)化,。與普通的函數(shù)不同,,DataFrame的函數(shù)并不會執(zhí)行后立即返回一個結(jié)果值,而是返回一個Column對象,,用于在并行作業(yè)中進(jìn)行求值,。Column可以用在DataFrame的操作之中,比如select,,filter,,groupBy等。函數(shù)的輸入值,,也可以是Column,。
聚合函數(shù)
approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函數(shù)
array_contains, explode, size, sort_array
日期/時間函數(shù)
日期時間轉(zhuǎn)換:
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
從日期時間中提取字段:
year, month, dayofmonth, hour, minute, second
日期/時間計(jì)算:
datediff, date_add, date_sub, add_months, last_day, next_day, months_between
獲取當(dāng)前時間等:
current_date, current_timestamp, trunc, date_format
數(shù)學(xué)函數(shù)
abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
混合函數(shù)
array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
字符串函數(shù)
ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
窗口函數(shù)
cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber
案例實(shí)戰(zhàn):
第一步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序的運(yùn)行時的配置信息,,例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,Spark程序在本地運(yùn)行
val conf = new SparkConf() //創(chuàng)建SparkConf對象
conf.setAppName("SparkSQL") //設(shè)置應(yīng)用程序的名稱,,在程序運(yùn)行的監(jiān)控界面可以看到名稱
//conf.setMaster("spark://DaShuJu-040:7077") //此時,程序在Spark集群
conf.setMaster("local")
第二步:創(chuàng)建SparkContext對象
SparkContext是Spark程序所有功能的唯一入口,,無論是采用Scala,、Java、Python,、R等都必須有一個SparkContext
SparkContext核心作用:初始化Spark應(yīng)用程序運(yùn)行所需要的核心組件,,包括DAGScheduler、TaskScheduler,、SchedulerBackend
同時還會負(fù)責(zé)Spark程序往Master注冊程序等
SparkContext是整個Spark應(yīng)用程序中最為至關(guān)重要的一個對象,。
val sc = new SparkContext(conf) //創(chuàng)建SparkContext對象,通過傳入SparkConf實(shí)例來定制Spark運(yùn)行的具體參數(shù)和配置信息
val sqlContext = new SQLContext(sc)
//注意:要使用Spark SQL的內(nèi)置函數(shù),,就一定要導(dǎo)入SQLContext下的隱式轉(zhuǎn)換
import sqlContext.implicits._
第三步:模擬數(shù)據(jù),,最后生成RDD
val userData = Array(
"2016-3-27,001,http://spark./,1000",
"2016-3-27,001,http://hadoop./,1001",
"2016-3-27,002,http://fink./,1002",
"2016-3-28,003,http://kafka./,1020",
"2016-3-28,004,http://spark./,1010",
"2016-3-28,002,http://hive./,1200",
"2016-3-28,001,http://parquet./,1500",
"2016-3-28,001,http://spark./,1800"
)
//并行化成RDD
val userDataRDD = sc.parallelize(userData)
第四步:根據(jù)業(yè)務(wù)需要對數(shù)據(jù)進(jìn)行預(yù)處理生成DataFrame,,要想把RDD轉(zhuǎn)換成DataFrame,需要先把RDD中的元素類型變成Row類型,,于此同時要提供DataFrame中的Columns的元數(shù)據(jù)信息描述,。
val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(",") ;Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})
val structTypes = StructType(Array(
StructField("time", StringType, true),
StructField("id", IntegerType, true),
StructField("url", StringType, true),
StructField("amount", IntegerType, true)
))
val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)
第五步:使用Spark SQL提供的內(nèi)置函數(shù)對DataFrame進(jìn)行操作,特別注意:內(nèi)置函數(shù)生成的Column對象且自定進(jìn)行CG
userDataDF.groupBy("time").agg('time, countDistinct('id)).map(row=>Row(row(1),row(2))).collect.foreach { println }
userDataDF.groupBy("time").agg('time, sum('amount)).show()
“`
|