你好,游客 登录
背景:
阅读新闻

云日志分析:hadoop spark编程实战入门教程11:spark streaming 接收 flume 监控目录的日志文件

[日期:2017-09-05] 来源:aboutyun  作者:pig2 [字体: ]

接上篇:
这一篇主要讲解从日志文件的流向,即从flume-->kafka-->spark streaming。

启动相关进程

首先,我们需要确保启动相应的进程。如果启动了可以忽略。

启动hdfs

登录master机器

  1. start-dfs.sh
在master机器上使用 jps 命令查看相关进程



master机器上出现NameNode和 SecondaryNameNode说明master节点启动正常。然后登录slave1和slave2机器,使用 jps 命令查看相关进程。


slave1和slave2节点出现DataNode说明slave节点启动正常。


启动spark

登录master机器

  1. start-master.sh
使用jps命令查看进程



master节点出现 Master 进程说明spark的master节点启动成功。


然后在master机器上继续输入以下命令:

  1. start-slaves.sh



使用jps命令在master机器、slave1机器和slave2机器上


如果每台机器上都能出现 Worker 进程,说明spark的worker节点启动成功。

启动zookeeper

分别登录master机器、slave1机器和slave2机器,输入以下命令:

  1. zkServer.sh start

然后在每台机器上输入jps 命令查看相关进程。


如果每台机上都出现 QuorumPeerMain 进程,说明 zookeeper 启动成功。

启动kafka

分别登录master机器、slave1机器和slave2机器,输入以下命令:

  1. cd $KAFKA_HOME
  2. kafka-server-start.sh -daemon ./config/server.properties

启动之后,使用jps命令查看相关进程:

如果每台机上都出现 Kafka 进程,说明 Kafka 启动成功。


启动flume

参考 about云日志分析项目准备9:Flume安装和使用 这篇中的Flume使用一节。

我们登录master机器。然后按照参考内容,将 flume source的监控目录设置为 /data/aboutyunlog 目录,sink的输出设为 kafka 的 aboutyunlog 这个topic。

之后创建相关目录(如果没有的话)

再之后在kafka上创建 aboutyunlog 这个topic(如果没有的话)。

  1. kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
完成上述步骤后,启动flume

 

  1. nohup flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console &

nohup 是一个可以用来启动后台进程的命令。

启动之后,使用jps命令查看相关进程:



如果可以看到 Application 这个进程,说明 flume-ng 启动成功。

注意:我们只是在master上启动了flume,用来监控master机器上的/data/aboutyunlog目录。

然后我们将我们的示例日志写入aboutyunlog.example文件中。

  1. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1057 31
  2. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1058 31
  3. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1057 31
  4. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
  5. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
  6. 2017-02-05 09:42:04 GET /plugin.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 925 1072 140
  7. 2017-02-05 09:42:04 GET /uc_server/data/avatar/000/00/55/20_avatar_middle.jpg 58.211.2.60 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 6430 1156 109

将 aboutyunlog.example 文件移动到 flume 监控目录:

  1. mv aboutyunlog.example  /data/aboutyunlog/
经过1分钟后,我们可以查看 /data/aboutyunlog目录,会发现flume已经将该文件标记为 “已完成” 状态,也就是说,flume已经将消息发送到了kafka的aboutyunlog这个topic下。



编写 Streaming 代码

接下来我们在 idea 中编写使用spark streaming读取kafka 中 aboutyunlog这个topic的消息的代码。

  1. import kafka.api.OffsetRequest
  2. import kafka.message.MessageAndMetadata
  3. import kafka.serializer.StringDecoder
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.kafka.KafkaUtils
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}
  8. /**
  9.   * Created by wangwei01 on 2017/3/12.
  10.   */
  11. object StreamingReadData {
  12.   Logger.getRootLogger.setLevel(Level.WARN)
  13.   def main(args: Array[String]): Unit = {
  14.     // 创建SparkConf对象,并指定AppName和Master
  15.     val conf = new SparkConf()
  16.         .setAppName("StreamingReadData")
  17.         .setMaster("local")
  18.     // 创建StreamingContext对象
  19.     val ssc = new StreamingContext(conf, Seconds(10))
  20. //    val zkServers = "master:2181,slave1:2181,slave2:2181"
  21.     // 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
  22.     val brokers = "master:9092,slave1:9092,slave2:9092"
  23.     val topics = "aboutyunlog"
  24.     val groupId = "consumer_001"
  25.     val topicsSet = topics.split(",").toSet
  26.     val kafkaParams = Map[String, String](
  27.      "metadata.broker.list" -> brokers,
  28.      "group.id" -> groupId,
  29.      "auto.offset.reset" -> OffsetRequest.SmallestTimeString  // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
  30.     )
  31.     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  32.      ssc, kafkaParams, topicsSet).map(_._2)
  33.     messages.print()
  34.     ssc.start()
  35.     ssc.awaitTermination()
  36.   }
  37. }
然后启动 spark streaming 程序,我们会看到类似的输出:



这说明我们的spark streaming 程序已经正确读取了从kafka中的消息。

收藏 推荐 打印 | 阅读:
相关新闻       kafka  flume  spark streaming