你好,游客 登录
背景:
阅读新闻

spark dataframe操作集锦(提取前几行,合并,入库等)

[日期:2017-09-23] 来源:csdn  作者:sparkexpert [字体: ]

1. 添加依赖文件到Scala Maven工程

<dependency>  
    <groupid>com.databricks</groupid>  
    <artifactid>spark-csv_2.10</artifactid>  
    <version>1.3.0</version>  
</dependency>  

2.从csv文件创建DataFrame

import com.databricks.spark.csv._
val conf = new SparkConf().setAppName(“csvDataFrame”).setMaster(“local”)
val sc  = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data=sqlContext.csvFile(filePath=””,useHeader=true,delimiter=’’)

3 操作DataFrame
1.data.printSchema:打印出每一列的名称和类型
2.data.show(numRows:Int):
3.Data.head(n:int).foreach(println): 这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是Row数组,我们需要遍历打印。
4.Select函数可以帮助我们从DataFrame中选择需要的列,并且返回一个全新的DataFrame:(可以选择多列)
如:val age: DataFrame=data.select(“age”)
5.根据条件过滤数据:
data.filter(“id>2”).show(2)
data.fileter(“NAME=’’”).show(2)
data.fileter(“NAME=’’ OR NAME=’NULL’”).show(2)(多条件过滤)
6.排序
data.sort(data(“Name”).desc).show(7) data.sort(“NAME”, “id”).show(10)
7.将DataFrame保存成文件
1.首先创建一个map对象,用于存储一些save函数需要用到的一些属性。这里我将制定保存文件的存放路径和csv的头信息。
Val saveOptions=Map(“Header”->”true”,”path”=””)
2.为了基于学习的态度,我们从DataFrame里面选择出studentName和email两列,并且将studentName的列名重定义为name。
Val copyOfStudents = students.select(students(“name”),students(“age”))
3.下面我们调用save函数保存上面的DataFrame数据到iteblog.csv文件夹中
CopyOfStudents.write.format(“com.databricks.spark.csv”).mode(SaveMode.overwrite).options(SaveOptions).save()
mode函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。从名字就可以很好的理 解,Overwrite代表覆盖目录下之前存在的数据;Append代表给指定目录下追加数据;Ignore代表如果目录下已经有文件,那就什么都不执 行;ErrorIfExists代表如果保存目录下存在文件,那么抛出相应的异常。需要注意的是,上述path参数指定的是保存文件夹,并不是最后的保存 文件名
例:

   val saveOptions=Map("Header"->"true","path"->"C:\\Users\\REN\\Desktop\\df2.csv")
val copyOfData= data.select(data("NAME"),data("YEAR"))
copyOfData.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOptions).save()
收藏 推荐 打印 | 阅读: