spark
Spark DataFrame提供了union函数用于将两个DataFrame合并。虽然经常使用,但是没有仔细读过官方文档,今天合并DataFrame时出了点错误,仔细看了API文档之后,发现使用时有一些坑需要注意。
def union(other: Dataset[T]): Dataset[T]
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct.
Also as standard in SQL, this function resolves columns by position (not by name).
从API文档中看,需要注意两点:
union操作和集合的并集并不等价,因为它不会去除重复数据。
union函数并不是按照列名和并得,而是按照位置合并的。即DataFrame的列名可以不相同,但对应位置的列将合并在一起。
下面是使用了例子:
object Union {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkSQL Union Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df1 = List[(String, String, String)](
("io.toutiao.bigdatacoder", "大数据", "person1"),
("com.eggpain.zhongguodashujuwang1457", "中国大数据", "person2"),
("com.cnfsdata.www", "房产大数据", "person3")
).toDF("package_name", "app_name", "user")
val df2 = List[(String, String, String)](
("com.jh.APP500958.news", null, "person4"),
("com.eggpain.zhongguodashujuwang1457", "中国大数据", "person2")
).toDF("package_name", "name", "user")
//df1与df2合并,不去重。列名不同并不影响合并
df1.union(df2).show()
//df1与df2合并,使用distinct去重。列名不同并不影响合并
df1.union(df2).distinct().show()
//df1与df3合并,注意df3与df1列数不同
val df3 = List[(String, String)](
("com.jh.APP500958.news", "person4"),
("com.eggpain.zhongguodashujuwang1457", "person2")
).toDF("package_name", "user")
//为df3增加一列,同时注意顺序,因为union合并是按照位置而不是列名
df1.union(df3.select($"package_name", lit(null).alias("app_name"), $"user")).show()
//虽然列数相同且类型匹配,但对应列位置不对
df1.union(df3.withColumn("app_name", lit(null))).show()
}
}
1. 合并两个相同的DataFrame结果
+--------------------+--------+------------+
| package_name|app_name| user|
+--------------------+--------+------------+
|io.toutiao.bigdat...| 大数据|person1|
|com.eggpain.zhong...| 中国大数据|person2|
| com.cnfsdata.www| 房产大数据|person3|
|com.jh.APP500958....| null|person4|
|com.eggpain.zhong...| 中国大数据|person2|
+--------------------+--------+-------+
2. 合并两个相同的DataFrame并去重的结果
+--------------------+--------+-------+
| package_name|app_name| user|
+--------------------+--------+-------+
|io.toutiao.bigdat...| 大数据|person1|
|com.jh.APP500958....| null|person4|
|com.eggpain.zhong...| 中国大数据|person2|
| com.cnfsdata.www| 房产大数据|person3|
+--------------------+--------+-------+
3. 列数不同时首先变换成相同列,并用select调整为相同顺序再合并结果
+--------------------+--------+-------+
| package_name|app_name| user|
+--------------------+--------+-------+
|io.toutiao.bigdat...| 大数据|person1|
|com.eggpain.zhong...| 中国大数据|person2|
| com.cnfsdata.www| 房产大数据|person3|
|com.jh.APP500958....| null|person4|
|com.eggpain.zhong...| null|person2|
+--------------------+--------+-------+
4. union函数并不检查列名是否相同,而是直接按照位置合并
+--------------------+--------+-------+
| package_name|app_name| user|
+--------------------+--------+-------+
|io.toutiao.bigdat...| 大数据|person1|
|com.eggpain.zhong...| 中国大数据|person2|
| com.cnfsdata.www| 房产大数据|person3|
|com.jh.APP500958....| person4| null|
|com.eggpain.zhong...| person2| null|
+--------------------+--------+-------+