你好,游客 登录
背景:
阅读新闻

Structured streaming+kafka集成样例

[日期:2018-11-09] 来源:jianshu  作者:biggeng [字体: ]

关于structured streaming, spark社区已经有很多文章介绍,个人认为其中最大的特点是将流视作没有边界的大表,从而能够使用sql来操作这张表,其中包括使用sql join(截止Spark2.1.1,目前只支持streaming和静态表之间的join,还不支持多条流之间的join ‘期待社区大招 ’)。

消息队列 Kafka

Kafka作为一个开源的消息队列,在流处理应用中应用非常广泛,比较典型的架构是: Storm + kafka + Redis 或者 Spark Streaming + Kafka + Redis。Kafka一般作为数据源,流计算引擎例如Storm或者Spark streaming从Kafka中读取数据,这样保证了流处理数据源的低延迟。在Structured Streaming之前,已经很好的和Spark做了集成。

管中窥豹之 Structured Streaming

尽管在Spark2.1.1中Structured streaming还处于Alpha版本,但是已经提供了一些example程序,可以试着跑跑。本文使用2.1.1中提供的structured streaming集成Kafka的样例 - StructuredKafkaWordCount
详细的代码请戳: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
具体来看,本例提供了一个简单的Structured Streaming wordCount程序,从Kafka中读取文本数据,然后在流中对读入的文本进行word拆分并统计各word出现的次数。

代码解析

  1. 创建Spark Session. 无需多言,每个Spark程序都需要做的步骤。
    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .getOrCreate()
  1. 读入Kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

format("kafka") 表示读取的类型是Kafka,kafka.bootstrap.servers 用来设置Kafka的broker。subscribeType 表示订阅类型,包括'assign', 'subscribe', 'subscribePattern'。

  • 如果设置为assign, 那么相应的值就要设置为消费的topic及partition,例如{"topicA":[0,1],"topicB":[2,4]}。
  • 如果设置为subscribe,相应的值为消费的topic list.
  • 如果设置为subscribePattern,相应的正则表达式描述
  1. wordcount
 val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

对每行的字符串分割成各个word, 然后count得到单词出现的次数。

  1. 输出
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

把结果输出到终端,输出模式为complete模式。

运行

Spark2.1.1发布的包中已经包含了此例子,直接在spark环境中提交作业就可以

  • 提交命令
     

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --master yarn-client --class org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount /usr/hdp/2.6.0.3-8/spark2/examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar ai01:6667 subscribe test
```
其中test topic为从某个文本中读取到kafka中的内容。

tail -n 0 -f zookeeper.log | /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --broker-list ai01:6667 --sync --topic test
  • Spark界面查看作业
    从作业监控看到,目前structured streaming的作业并不像之前的spark streaming作业一样有个streaming的标签,可以看到每个batch读取的记录以及处理情况展示。不知道是还有待完善还是以后就这样了. :)




作者:biggeng
链接:https://www.jianshu.com/p/3fa78a5603b0
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
收藏 推荐 打印 | 阅读: