Spark SQL的聚合函数中有first, last函数,从字面意思就是根据分组获取第一条和最后一条记录的值,实际上,只在local模式下,你可以得到满意的答案,但是在生产环境(分布式)时,这个是不能保证的。看源码的解释:
复制代码
1
2
3
4
5
6
7/** * Returns the first value of `child` for a group of rows. If the first value of `child` * is `null`, it returns `null` (respecting nulls). Even if [[First]] is used on an already * sorted column, if we do partial aggregation and final aggregation (when mergeExpression * is used) its result will not be deterministic (unless the input table is sorted and has * a single partition, and we use a single reducer to do the aggregation.). */
如何保证first, last是有效呢?表要排好序的,同时只能用一个分区处理,再用一个reducer来聚合。。。
所以,在多分区场景不能用first, last函数求得聚合的第一条和最后一条数据。
解决方案:利用Window。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16val spark = SparkSession.builder().master("local").appName("Demo").getOrCreate() import spark.implicits._ val df = Seq(("a", 10, 12345), ("a", 12, 34567), ("a", 11, 23456), ("b", 10, 55555), ("b", 8, 12348)).toDF("name", "value", "event_time") // 定义window val asc = Window.partitionBy("name").orderBy($"event_time") val desc = Window.partitionBy("name").orderBy($"event_time".desc) // 根据window生成row_number,根据row_number获取对应的数据 val firstValue = df.withColumn("rn", row_number().over(asc)).where($"rn" === 1).drop("rn") val lastValue = df.withColumn("rn", row_number().over(desc)).where($"rn" === 1).drop("rn") // 利用join把数据聚合一起 df.groupBy("name") .count().as("t1") .join(firstValue.as("fv"), "name") .join(lastValue.as("lv"), "name") .select($"t1.name", $"fv.value".as("first_value"), $"lv.value".as("last_value"), $"t1.count") .show()
输出:
复制代码
1
2
3
4
5
6+----+-----------+----------+-----+ |name|first_value|last_value|count| +----+-----------+----------+-----+ | b| 8| 10| 2| | a| 10| 12| 3| +----+-----------+----------+-----+
最后
以上就是凶狠台灯最近收集整理的关于Spark first, last函数的坑的全部内容,更多相关Spark内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复