你好,游客 登录
背景:
阅读新闻

云日志分析:hadoop spark编程实战入门教程10:使用Intellij Idea搭建Spark Streaming开发环境(SBT版本)

[日期:2017-09-05] 来源:aboutyun  作者:pig2 [字体: ]

问题导读:
1.  如何创建SBT项目?
2.  如何添加SBT依赖?
3.  如何测试Spark Streaming程序可以正常运行?




接上篇:about云日志分析项目准备9:Flume安装和使用
到目前为止,我们已经搭建好了整个项目所需要的环境,现在来讲解开发环境的搭建。我们采用 Intellij Idea 作为开发工具。 Intellij Idea 下载地址:https://www.jetbrains.com/idea/#chooseYourEdition。  根据自己的系统来选择不同的安装 包(这儿我们选用社区版,因为是免费的)。

注意:
安装 Intellij Idea 前,我们需要确保本机已经安装了jdk和scala。如果未安装,需要先安装。可以参考我们之前的about云日志分析项目准备6:Hadoop、Spark集群搭建 中的jdk、scala安装部分。这儿jdk选择1.8、scala选择2.11.8版本。
Intellij Idea的安装过程就不说了,基本上是一路“next”,安装好Intellij Idea之后,需要安装Scala和SBT插件。

创建项目

安装完成后,我们直接开始创建项目。





点击完 “Finish”按钮后,Intellij Idea会进行该项目的构建,这个过程需要一定的时间,因为sbt需要下载指定scala版本(在这里是scala 2.11.8)的基础依赖包。下载过程如下:


注意:如果下载不成功的话,需要考虑翻墙

项目构建完成后的目录结构如下所示:



添加依赖

由于我们编写的是 Spark Streaming 程序,所以还需要添加相关的依赖。其实就是编写build.sbt文件。我们可以先查看下现有的build.sbt文件内容:


然后将以下依赖加入(注意空格):

  1. libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "1.6.3"
  2. libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.11" % "1.6.3"
加好后如下所示:



点击“Refresh project”,Intellij Idea会通过SBT下载这些写好的依赖,这个过程也会花费很长的时间。

编写程序

目前创建了aboutyun_log_analysis工程,工程中包含了aboutyun_log_analysis/src/main/ 和 aboutyun_log_analysis/src/test/ 目录,其中前者是用于生产环境中代码编写,后者用于测试代码编写。本文将在后者目录进行代码编写。




这样我们就创建了一个StreamingDemo类。接下来就是真正开始编写代码了。在这里我们来使用Streaming来实现WordCount。代码如下:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.storage.StorageLevel
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. /**
  5.   * Created by wangwei01 on 2017/2/3.
  6.   */
  7. object StreamingDemo {
  8.   def main(args: Array[String]): Unit = {
  9.     // 创建SparkConf对象,并指定AppName和Master
  10.     val conf = new SparkConf().setAppName("StreamingDemo").setMaster("local[*]")
  11.     // 创建StreamingContext对象
  12.     val ssc = new StreamingContext(conf, Seconds(10))
  13.     val hostname = "192.168.1.10" // 即我们的master虚拟机
  14.     val port = 9999 // 端口号
  15.     // 创建DStream对象
  16.     val lines = ssc.socketTextStream(hostname, port, StorageLevel.MEMORY_AND_DISK_SER)
  17.     val words = lines.flatMap(_.split(" "))
  18.     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  19.     wordCounts.print()
  20.     ssc.start()
  21.     ssc.awaitTermination()
  22.   }
  23. }
测试程序


之后我们在我们的master虚拟机上使用如下命令:

  1. nc -lk 9999

然后在Intellij Idea中运行我们编写好的Streaming程序。


运行之后会在控制台中输出如下信息:


然后我们在master虚拟机的端口中输入以下内容:

  1. hello aboutyun
  2. aboutyun is my firend

然后查看Intellij Idea的控制台:


说明我们的Spark Streaming程序成功运行了。

收藏 推荐 打印 | 阅读: