你好,游客 登录
背景:
阅读新闻

基于spark的DataFrame实战

[日期:2018-04-26] 来源:iteye  作者:bo_hai [字体: ]

Spark 中的另一核心功能是DataFrame,方便处理结构化数据。实例中还是以上一篇博客中的数据为基础。

我们要求以下数据:

1、查看338用户的评分记录;

2、将结果保存成csv格式;

3、评论电影最多的用户id;

4、被用户评论最多的电影id、title;

5、评论电影年龄最小者、最大者;

6、25至30岁的用户最喜欢的电影;

7、最受用户喜爱的电影;

代码如下:

 

Java代码  收藏代码
  1. import org.apache.spark.sql.{DataFrame, SQLContext} 
  2. import org.apache.spark.{SparkConf, SparkContext} 
  3.  
  4.  
  5. /** 
  6.   * 更多内容请参考:http://www.iteblog.com/archives/1566#DataFrame-4 
  7.   * 
  8.   */ 
  9. object MoviesDataStatistics { 
  10.  
  11.   case class Ratings(userId: Int, movieId: Int, rating: Double) 
  12.  
  13.   case class Movies(id: Int, movieTitle: String, releaseDate: String) 
  14.  
  15.   case class Users(id: Int, age: Int, gender: String) 
  16.  
  17.   def main(args: Array[String]) { 
  18.     val conf = new SparkConf().setAppName("MoviesDataStatistics"
  19.     val sc = new SparkContext(conf) 
  20.     val sqlContext = new SQLContext(sc) 
  21.     import sqlContext.implicits._ 
  22.     val ratingsDF: DataFrame = sc.textFile("/data/ratings.data").map(x => x.split("::")).map(line => Ratings(line(0).toInt, line(1).toInt, line(2).toDouble)).toDF() 
  23.     ratingsDF.registerTempTable("ratings"
  24.     //查看338评分记录条数 
  25.     println("sql for 338 rateing info is : "
  26.     sqlContext.sql("select * from ratings where userId = 338").show() 
  27.     println("dataframe 338 rateing info is : "
  28.     ratingsDF.filter(ratingsDF("userId").equalTo(338)).show() 
  29.  
  30.     val userDataDF = sc.textFile("/data/user.data").map(x => x.split("[|]")).map(line => Users(line(0).toInt, line(1).toInt, line(2))).toDF() 
  31.     userDataDF.registerTempTable("users"
  32.     sqlContext.sql("select * from users where id = 338").show() 
  33.     userDataDF.filter(userDataDF("id").equalTo(338)).show() 
  34.  
  35.     val movieDF = sc.textFile("/data/movies.data").map(x => x.split("::")).map(line => Movies(line(0).toInt, line(1), line(2))).toDF() 
  36.     movieDF.registerTempTable("movies"
  37.     movieDF.collect() 
  38.     sqlContext.sql("select * from movies where id = 1").show() 
  39.     movieDF.filter(movieDF("id").equalTo(1)).show() 
  40.  
  41.     sqlContext.sql("select r.userId,m.movieTitle,r.rating from movies m inner join ratings r on m.id = r.movieId and r.userId = 338 order by r.rating desc ").show() 
  42.     val resultDF = movieDF.join(ratingsDF.filter(ratingsDF("userId").equalTo(338)), movieDF("id").equalTo(ratingsDF("movieId"))) 
  43.       .sort(ratingsDF("rating").desc).select("userId""movieTitle""rating"
  44.  
  45.     resultDF.collect().foreach(println) 
  46.     import org.apache.spark.sql.functions._ 
  47.     //将结果保存至csv格式 
  48.     //val saveOptions = Map("header" -> "true", "path" -> "/data/rat_movie.csv") 
  49.     //resultDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOptions).save() 
  50.     // 评论电影最多的用户id 
  51.     sqlContext.sql("select userId,count(*) as count from ratings group by userId order by count desc ").show(1
  52.     val userIdCountDF = ratingsDF.groupBy("userId").count() 
  53.     userIdCountDF.join(userIdCountDF.agg(max("count").alias("max_count")), $"count".equalTo($"max_count")).select("userId").show(1
  54.  
  55.     // 被用户评论最多的电影id、title 
  56.     val movieIDGroupDF = ratingsDF.groupBy("movieId").count() 
  57.     val movieCountDF = movieIDGroupDF.join(movieIDGroupDF.agg(max("count").alias("max_count"))).filter($"count".equalTo($"max_count")) 
  58.     //星球大战是被用户评论最多的电影 
  59.     movieCountDF.join(movieDF).filter($"movieId".equalTo($"id")).select("movieId""movieTitle""releaseDate").show() 
  60.  
  61.     // 评论电影年龄最小者、最大者 
  62.     // 年龄最大的73岁,最小的7岁 
  63.     ratingsDF.join(userDataDF, ratingsDF("userId").equalTo(userDataDF("id"))) 
  64.       .agg(min($"age").alias("min_age"), max($"age").alias("max_age")) 
  65.       .join(userDataDF, $"age".isin($"min_age", $"max_age")) 
  66.       .select("id""age""gender").show(2
  67.     // https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html 
  68.     // 25至30岁的用户欢迎的电影 
  69.     userDataDF.filter($"age".between(2530)).join(ratingsDF, $"id".equalTo($"userId")) 
  70.       .select("userId""movieId""rating").join(movieDF, $"rating".equalTo(5)).select("movieId""movieTitle").show(10
  71.     // 最受用户喜爱的电影 
  72.     ratingsDF.groupBy("movieId").agg(avg("rating").alias("avg_rate")) 
  73.       .sort($"avg_rate".desc).limit(10
  74.       .join(movieDF, $"movieId".equalTo($"id")) 
  75.       .select("movieTitle").show(false
  76.   } 

 总结:

 

1、创建DF时需要引入import sqlContext.implicits._

2、使用DF函数时,需要import org.apache.spark.sql.functions._

3、DF的函数功能非常强大,基本的函数功能一定要掌握;

4、个人认为DF的功能比Sql的功能强大

参考:

https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice3/

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/GroupedData.html

收藏 推荐 打印 | 阅读: