你好,游客 登录
背景:
阅读新闻

spark dataframe和dataSet用电影点评数据实战

[日期:2018-04-26] 来源:csdn  作者: a11a22334455 [字体: ]

RDD

优点:

  1. 编译时类型安全 
    编译时就能检查出类型错误
  2. 面向对象的编程风格 
    直接通过类名点的方式来操作数据

缺点:

  1. 序列化和反序列化的性能开销 
    无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化.
  2. GC的性能开销 
    频繁的创建和销毁对象, 势必会增加GC
 

DataFrame

DataFrame引入了schema和off-heap

  • schema : RDD每一行的数据, 结构都是一样的. 这个结构就存储在schema中. Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了.

  • off-heap : 意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作.

off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制, 也就不再收GC的困扰了.

通过schema和off-heap, DataFrame解决了RDD的缺点, 但是却丢了RDD的优点. DataFrame不是类型安全的, API也不是面向对象风格的.

DataSet

DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder

当序列化数据时, Encoder产生字节码与off-heap进行交互, 能够达到按需访问数据的效果, 而不用反序列化整个对象. Spark还没有提供自定义Encoder的API, 但是未来会加入.

 

package com.dt.spark.sparksql



import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}






/**
  * 电影点评系统用户行为分析:用户观看电影和点评电影的所有行为数据的采集、过滤、处理和展示:
  *   数据采集:企业中一般越来越多的喜欢直接把Server中的数据发送给Kafka,因为更加具备实时性;
  *   数据过滤:趋势是直接在Server端进行数据过滤和格式化,当然采用Spark SQL进行数据的过滤也是一种主要形式;
  *   数据处理:
  *     1,一个基本的技巧是,先使用传统的SQL去实现一个下数据处理的业务逻辑(自己可以手动模拟一些数据);
  *     2,再一次推荐使用DataSet去实现业务功能尤其是统计分析功能;
  *     3,如果你想成为专家级别的顶级Spark人才,请使用RDD实现业务功能,为什么?运行的时候是基于RDD的!
  *
  *  数据:强烈建议大家使用Parquet
  *  1,"ratings.dat":UserID::MovieID::Rating::Timestamp
  *  2,"users.dat":UserID::Gender::Age::OccupationID::Zip-code
  *  3,"movies.dat":MovieID::Title::Genres
  *  4, "occupations.dat":OccupationID::OccupationName   一般情况下都会以程序中数据结构Haskset的方式存在,是为了做mapjoin
  */
object Movie_Users_Analyzer_DateSet {


  case class User(UserID:String, Gender:String, Age:String, OccupationID:String, Zip_Code:String)
  case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp:String)
  case class Movie(MovieID:String, Title:String, Genres:String)




  def main(args: Array[String]){








    Logger.getLogger("org").setLevel(Level.ERROR)


    var masterUrl = "local[8]" //默认程序运行在本地Local模式中,主要学习和测试;
    var dataPath = "moviedata/medium/"  //数据存放的目录;


    /**
      * 当我们把程序打包运行在集群上的时候一般都会传入集群的URL信息,在这里我们假设如果传入
      * 参数的话,第一个参数只传入Spark集群的URL第二个参数传入的是数据的地址信息;
      */
    if(args.length > 0) {
      masterUrl = args(0)
    } else if (args.length > 1) {
      dataPath = args(1)
    }




    /**
      * 创建Spark会话上下文SparkSession和集群上下文SparkContext,在SparkConf中可以进行各种依赖和参数的设置等,
      * 大家可以通过SparkSubmit脚本的help去看设置信息,其中SparkSession统一了Spark SQL运行的不同环境。
      */
    val sparkConf = new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer_DataSet")


    /**
      * SparkSession统一了Spark SQL执行时候的不同的上下文环境,也就是说Spark SQL无论运行在那种环境下我们都可以只使用
      * SparkSession这样一个统一的编程入口来处理DataFrame和DataSet编程,不需要关注底层是否有Hive等。
      */
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()


    val sc = spark.sparkContext //从SparkSession获得的上下文,这是因为我们读原生文件的时候或者实现一些Spark SQL目前还不支持的功能的时候需要使用SparkContext


    import spark.implicits._
    /**
      * 读取数据,用什么方式读取数据呢?在这里是使用RDD!
      */
    val usersRDD = sc.textFile(dataPath + "users.dat")
    val moviesRDD = sc.textFile(dataPath + "movies.dat")
    val occupationsRDD = sc.textFile(dataPath + "occupations.dat")
    val ratingsRDD = sc.textFile(dataPath + "ratings.dat")




    /**
      * 功能一:通过DataFrame实现某特定电影观看者中男性和女性不同年龄分别有多少人?
      *   1,从点评数据中获得观看者的信息ID;
      *   2,把ratings和users表进行join操作获得用户的性别信息;
      *   3,使用内置函数(内部包含超过200个内置函数)进行信息统计和分析
      *  在这里我们通过DataFrame来实现:首先通过DataFrame的方式来表现ratings和users的数据,然后进行join和统计操作
      */


    println("功能一:通过DataFrame实现某特定电影观看者中男性和女性不同年龄分别有多少人?")
    val schemaforusers = StructType("UserID::Gender::Age::OccupationID::Zip_Code".split("::").
      map(column => StructField(column, StringType, true))) //使用Struct方式把Users的数据格式化,即在RDD的基础上增加数据的元数据信息
    val usersRDDRows = usersRDD.map(_.split("::")).map(line => Row(line(0).trim,line(1).
      trim,line(2).trim,line(3).trim,line(4).trim)) //把我们的每一条数据变成以Row为单位的数据
    val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaforusers)  //结合Row和StructType的元数据信息基于RDD创建DataFrame,这个时候RDD就有了元数据信息的描述
    val usersDataSet = usersDataFrame.as[User]




    val schemaforratings = StructType("UserID::MovieID".split("::").
      map(column => StructField(column, StringType, true))).
      add("Rating", DoubleType, true).
      add("Timestamp",StringType, true)


    val ratingsRDDRows = ratingsRDD.map(_.split("::")).map(line => Row(line(0).trim,line(1).
      trim,line(2).trim.toDouble,line(3).trim))
    val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings)
    val ratingsDataSet = ratingsDataFrame.as[Rating]




    val schemaformovies = StructType("MovieID::Title::Genres".split("::").
      map(column => StructField(column, StringType, true))) //使用Struct方式把Users的数据格式化,即在RDD的基础上增加数据的元数据信息
    val moviesRDDRows = moviesRDD.map(_.split("::")).map(line => Row(line(0).trim,line(1).
      trim,line(2).trim)) //把我们的每一条数据变成以Row为单位的数据
    val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies)  //结合Row和StructType的元数据信息基于RDD创建DataFrame,这个时候RDD就有了元数据信息的描述
    val moviesDataSet = moviesDataFrame.as[Movie]


    println
    ratingsDataFrame.filter(s" MovieID = 1193") //这里能够直接指定MovieID的原因是DataFrame中有该元数据信息!
          .join(usersDataFrame, "UserID") //Join的时候直接指定基于UserID进行Join,这相对于原生的RDD操作而言更加方便快捷
          .select("Gender", "Age")  //直接通过元数据信息中的Gender和Age进行数据的筛选
          .groupBy("Gender", "Age") //直接通过元数据信息中的Gender和Age进行数据的groupBy操作
          .count()  //基于groupBy分组信息进行count统计操作
          .show(10) //显示出分组统计后的前10条信息
    println("功能一:通过DataSet实现某特定电影观看者中男性和女性不同年龄分别有多少人?")
    ratingsDataSet.filter(s" MovieID = 1193") //这里能够直接指定MovieID的原因是DataFrame中有该元数据信息!
      .join(usersDataFrame, "UserID") //Join的时候直接指定基于UserID进行Join,这相对于原生的RDD操作而言更加方便快捷
      .select("Gender", "Age")  //直接通过元数据信息中的Gender和Age进行数据的筛选
      .groupBy("Gender", "Age") //直接通过元数据信息中的Gender和Age进行数据的groupBy操作
      .count()  //基于groupBy分组信息进行count统计操作
      .show(10) //显示出分组统计后的前10条信息
    /**
      * 功能二:用SQL语句实现某特定电影观看者中男性和女性不同年龄分别有多少人?
      * 1,注册临时表,写SQL语句需要Table;
      * 2,基于上述注册的零时表写SQL语句;
      */
    println("功能二:用GlobalTempView的SQL语句实现某特定电影观看者中男性和女性不同年龄分别有多少人?")
    ratingsDataFrame.createGlobalTempView("ratings")
    usersDataFrame.createGlobalTempView("users")




    spark.sql("SELECT Gender, Age, count(*) from  global_temp.users u join  global_temp.ratings as r on u.UserID = r.UserID where MovieID = 1193" +
      " group by Gender, Age").show(10)


    println("功能二:用LocalTempView的SQL语句实现某特定电影观看者中男性和女性不同年龄分别有多少人?")
    ratingsDataFrame.createTempView("ratings")
    usersDataFrame.createTempView("users")




    spark.sql("SELECT Gender, Age, count(*) from  users u join  ratings as r on u.UserID = r.UserID where MovieID = 1193" +
      " group by Gender, Age").show(10)


    /**
      * 功能三:使用DataFrame进行电影流行度分析:所有电影中平均得分最高(口碑最好)的电影及观看人数最高的电影(流行度最高)
      * "ratings.dat":UserID::MovieID::Rating::Timestamp
      * 得分最高的Top10电影