官方示例說明按照官方文檔的 這個(gè)示例說明,,可以輕松的在本地的spark-shell環(huán)境中測試這個(gè)示例。示例,,即為了更好的入門,,那么就再說明一下。 運(yùn)行這個(gè)統(tǒng)計(jì)單詞的方式有三種,,前面兩種是官方文檔上的指引,,第三種則是用scala程序運(yùn)行。
第一種方式, run-demo- 打開一個(gè)終端,,打開一個(gè)終端,,輸入 命令
nc -lk 9999 ,暫時(shí)叫做 “nc終端” 吧 再打開終端,,切換到Spark HOME目錄,, 執(zhí)行命令 bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 , 然后每秒會(huì)有類似一下日志循環(huán)輸出 ------------------------------------------- Time: 1415701382000 ms ------------------------------------------- ------------------------------------------- Time: 1415701383000 ms -------------------------------------------
在nc終端隨便輸入一些字符串,,用空格隔開,,回車,如aa aa bb c,??梢栽谏厦娴腟park終端中看到有新內(nèi)容輸出 ------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK,成功,!
第二種 spark-shell 模式下面介紹在spark-shell中輸入scala代碼運(yùn)行的方式,。- 同上面第一步,打開一個(gè)終端,,打開一個(gè)終端,,輸入 命令
nc -lk 9999 ,,暫時(shí)叫做 “nc終端” 吧 再打開一個(gè)終端, 切換到Spark HOME目錄下,,輸入 bin/spark-shell (如果你已經(jīng)安裝好了Spark的話,,直接輸入 spark-shell 即可),等待Spark啟動(dòng)成功,,會(huì)打印信息 Spark context available as sc. scala>
然后輸入以下語句:
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext(sc, Seconds(1))
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print a few of the counts to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
會(huì)打印以下信息: 14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440 ...... 14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s 14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s ------------------------------------------- Time: 1415700443000 ms -------------------------------------------
同第一種方式的第3步,,隨便輸入一些字符串,用空格隔開,,回車,,如aa aa bb c??梢栽谏厦娴腟park終端中看到有新內(nèi)容輸出 ------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK,,成功!
第三種 scala-ide編程方式在用這種方式運(yùn)行這個(gè)demo代碼的時(shí)候,,遇到了不少問題,記錄下來,,供大家參考,。這個(gè)例子,請大家先根據(jù)這里記錄的方式進(jìn)行操作,,得到一個(gè)可以運(yùn)行的程序,,后面我會(huì)記錄遇到的問題。 下載scala-ide, 下載鏈接,,下載 For Scala 2.10.4 下的對應(yīng)平臺的ide,,解壓,運(yùn)行,。 - 安裝sbt,,下載鏈接,
- 安裝sbteclipse, github地址, 編輯
~/.sbt/0.13/plugins/plugins.sbt 文件, 添加以下內(nèi)容 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0") ,,如果沒有plugins目錄和plugins.sbt,,自行創(chuàng)建。 用向?qū)?chuàng)建一個(gè)scala項(xiàng)目,,并在項(xiàng)目根目錄下創(chuàng)建一個(gè)build.sbt文件,,添加以下內(nèi)容(注意,每行正式語句之后要換行) name := "spark-test" version := "1.0" scalaVersion := "2.10.4" // set the main class for the main 'run' task // change Compile to Test to set it for 'test:run' mainClass in (Compile, run) := Some("test.SparkTest") libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"
創(chuàng)建test.SparkTest.scala文件,,添加以下代碼 package test import org.apache.spark.streaming. import org.apache.spark.streaming.StreamingContext. import org.apache.spark.SparkContext import org.apache.spark.api.java.function. import org.apache.spark.streaming. import org.apache.spark.streaming.api._ object SparkTest { def main(args: Array[String]): Unit = { // Create a StreamingContext with a local master // Spark Streaming needs at least two working thread val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10)) // Create a DStream that will connect to serverIP:serverPort, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(.split(" ")) // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print ssc.start ssc.awaitTermination } }
終端中切換目錄到這個(gè)項(xiàng)目根目錄,,輸入命令 sbt , 命令運(yùn)行成功后,,敲入 eclipse 生成eclipse項(xiàng)目和項(xiàng)目所需依賴 - 同第一種方式的第1,3步,,
再打開一個(gè)終端,,輸入 命令 nc -lk 9999 。 然后運(yùn)行剛才寫的main程序,,在nc終端中輸入一些字符串,,用空格隔開,回車,,如aa aa bb c,。可以在ide控制臺中觀察到------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK,,成功,!
下面是遇到的問題及解決方法:1. 運(yùn)行程序說找不到主類解:沒有在sbt文件配置主類是哪個(gè),在build.sbt 文件中添加以下代碼 mainClass in (Compile, run) := Some("test.SparkTest")
Some中就是主類的路徑 2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class這個(gè)問題困擾了我很長時(shí)間,,一直沒找到怎么解決,。后來看到說是scala每次版本升級不兼容以前的版本編譯的庫,于是換了對應(yīng)的版本的ide才正常運(yùn)行,。 解:scala-ide版本和現(xiàn)在用的spark包依賴編譯的scala版本不一致,, 請下載上面說過的 scala-ide For Scala 2.10.4 版本。
|