久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

運(yùn)行第一個(gè)SparkStreaming程序

 昵稱23016082 2017-01-11

官方示例說明

按照官方文檔的 這個(gè)示例說明,,可以輕松的在本地的spark-shell環(huán)境中測試這個(gè)示例。示例,,即為了更好的入門,,那么就再說明一下。
運(yùn)行這個(gè)統(tǒng)計(jì)單詞的方式有三種,,前面兩種是官方文檔上的指引,,第三種則是用scala程序運(yùn)行。


  • 第一種方式, run-demo

  • 打開一個(gè)終端,,打開一個(gè)終端,,輸入 命令 nc -lk 9999,暫時(shí)叫做 “nc終端” 吧
  • 再打開終端,,切換到Spark HOME目錄,, 執(zhí)行命令 bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999, 然后每秒會(huì)有類似一下日志循環(huán)輸出

    -------------------------------------------
    Time: 1415701382000 ms
    -------------------------------------------
    -------------------------------------------
    Time: 1415701383000 ms
    -------------------------------------------

  • 在nc終端隨便輸入一些字符串,,用空格隔開,,回車,如aa aa bb c,??梢栽谏厦娴腟park終端中看到有新內(nèi)容輸出

    -------------------------------------------
    Time: 1415701670000 ms
    -------------------------------------------
    (aa,2)
    (bb,1)
    (c,1)

OK,成功,!


  • 第二種 spark-shell 模式

    下面介紹在spark-shell中輸入scala代碼運(yùn)行的方式,。
  • 同上面第一步,打開一個(gè)終端,,打開一個(gè)終端,,輸入 命令 nc -lk 9999,,暫時(shí)叫做 “nc終端” 吧
  • 再打開一個(gè)終端, 切換到Spark HOME目錄下,,輸入 bin/spark-shell (如果你已經(jīng)安裝好了Spark的話,,直接輸入 spark-shell 即可),等待Spark啟動(dòng)成功,,會(huì)打印信息

    Spark context available as sc.
    scala>

    然后輸入以下語句:

import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._

// Create a StreamingContext with a local master
val ssc = new StreamingContext(sc, Seconds(1))

// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print a few of the counts to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

會(huì)打印以下信息:

14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440
......
14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s
14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s
-------------------------------------------
Time: 1415700443000 ms
-------------------------------------------

  1. 同第一種方式的第3步,,隨便輸入一些字符串,用空格隔開,,回車,,如aa aa bb c??梢栽谏厦娴腟park終端中看到有新內(nèi)容輸出

    -------------------------------------------
    Time: 1415701670000 ms
    -------------------------------------------
    (aa,2)
    (bb,1)
    (c,1)

    OK,,成功!


  • 第三種 scala-ide編程方式

    在用這種方式運(yùn)行這個(gè)demo代碼的時(shí)候,,遇到了不少問題,記錄下來,,供大家參考,。這個(gè)例子,請大家先根據(jù)這里記錄的方式進(jìn)行操作,,得到一個(gè)可以運(yùn)行的程序,,后面我會(huì)記錄遇到的問題。

  • 下載scala-ide, 下載鏈接,,下載 For Scala 2.10.4 下的對應(yīng)平臺的ide,,解壓,運(yùn)行,。

  • 安裝sbt,,下載鏈接,
  • 安裝sbteclipse, github地址, 編輯 ~/.sbt/0.13/plugins/plugins.sbt 文件, 添加以下內(nèi)容 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0"),,如果沒有plugins目錄和plugins.sbt,,自行創(chuàng)建。
  • 用向?qū)?chuàng)建一個(gè)scala項(xiàng)目,,并在項(xiàng)目根目錄下創(chuàng)建一個(gè)build.sbt文件,,添加以下內(nèi)容(注意,每行正式語句之后要換行)

    name := "spark-test"

    version := "1.0"

    scalaVersion := "2.10.4"

    // set the main class for the main 'run' task
    // change Compile to Test to set it for 'test:run'
    mainClass in (Compile, run) := Some("test.SparkTest")

    libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"

  • 創(chuàng)建test.SparkTest.scala文件,,添加以下代碼

    package test
    import org.apache.spark.streaming.
    import org.apache.spark.streaming.StreamingContext.

    import org.apache.spark.SparkContext
    import org.apache.spark.api.java.function.
    import org.apache.spark.streaming.

    import org.apache.spark.streaming.api._

    object SparkTest {
    def main(args: Array[String]): Unit = {
    // Create a StreamingContext with a local master
    // Spark Streaming needs at least two working thread
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10))
    // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
    val lines = ssc.socketTextStream("localhost", 9999)
    // Split each line into words
    val words = lines.flatMap(.split(" "))
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(
     + _)
    wordCounts.print
    ssc.start
    ssc.awaitTermination
    }
    }

  • 終端中切換目錄到這個(gè)項(xiàng)目根目錄,,輸入命令 sbt , 命令運(yùn)行成功后,,敲入 eclipse生成eclipse項(xiàng)目和項(xiàng)目所需依賴

  • 同第一種方式的第1,3步,,
    再打開一個(gè)終端,,輸入 命令 nc -lk 9999
    然后運(yùn)行剛才寫的main程序,,在nc終端中輸入一些字符串,,用空格隔開,回車,,如aa aa bb c,。可以在ide控制臺中觀察到

    -------------------------------------------
    Time: 1415701670000 ms
    -------------------------------------------
    (aa,2)
    (bb,1)
    (c,1)

OK,,成功,!


下面是遇到的問題及解決方法:

1. 運(yùn)行程序說找不到主類

解:沒有在sbt文件配置主類是哪個(gè),在build.sbt 文件中添加以下代碼

mainClass in (Compile, run) := Some("test.SparkTest")

Some中就是主類的路徑

2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class

這個(gè)問題困擾了我很長時(shí)間,,一直沒找到怎么解決,。后來看到說是scala每次版本升級不兼容以前的版本編譯的庫,于是換了對應(yīng)的版本的ide才正常運(yùn)行,。
解:scala-ide版本和現(xiàn)在用的spark包依賴編譯的scala版本不一致,, 請下載上面說過的 scala-ide For Scala 2.10.4 版本。

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式,、誘導(dǎo)購買等信息,,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,,請點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多