【问题标题】:how to create customised user defined aggregate distinct function如何创建自定义用户定义的聚合不同功能
【发布时间】:2017-10-19 13:33:56
【问题描述】:

我有一个包含 4 列的数据框。

数据框示例

id1 id2 id3 id4
---------------
a1  a2  a3  a4
b1  b2  b3  b4
b1  b2  b3  b4
c1  c2  c3  c4
    b2      
c1
        a3
            a4
c1
        d4

一行中有两种数据,要么所有列都有数据,要么只有一列。

我想对所有列执行不同的功能,例如在比较行之间的值时,它只会比较一行中存在的值而不考虑空值。

输出数据帧应该是

id1 id2 id3 id4
a1  a2  a3  a4
b1  b2  b3  b4
c1  c2  c3  c4
        d4

我在 spark 中查看了多个 UDAF 示例。但无法修改。

【问题讨论】:

  • 为什么不对所有四列都使用过滤方法?
  • 过滤器如何帮助我?
  • 请看下面我的回答
  • 请不要在得到答案后修改您的问题 - 这可能会使它们无效。最好打开一个新问题。
  • 只是加点容易理解。

标签: scala apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

您可以将filter 用于以下所有列

df.filter($"id1" =!= "" && $"id2" =!= "" && $"id3" =!= "" && $"id4" =!= "")

你应该得到你的最终数据帧。

以上代码适用于静态四列数据框。如果上面的方法超过四列,就会变得很忙,因为你必须编写太多的逻辑检查。

解决方案是使用udf 函数,如下所示

import org.apache.spark.sql.functions._
def checkIfNull = udf((co : mutable.WrappedArray[String]) => !(co.contains(null) || co.contains("")))
df.filter(checkIfNull(array(df.columns.map(col): _*))).show(false)

希望回答对你有帮助

【讨论】:

  • 感谢您的回答,但首先我需要检查另一行是否存在某一行的值,然后仅将其删除,否则保留该行。查看修改后的问题。
【解决方案2】:

可以利用dropDuplicates 依赖于订单来解决此问题,请参阅答案here。不过效率不是很高,应该有更高效的解决方案。

首先使用distinct() 删除所有重复项,然后按每列迭代排序并删除重复项。列按降序排列为nulls,然后将放在最后。

四个静态列的示例:

val df2 = df.distinct()
  .orderBy($"id1".desc).dropDuplicates("id1")
  .orderBy($"id2".desc).dropDuplicates("id2")
  .orderBy($"id3".desc).dropDuplicates("id3")
  .orderBy($"id4".desc).dropDuplicates("id4")

【讨论】:

  • 感谢您的回答,但首先我需要检查另一行是否存在某一行的值,然后仅将其删除,否则保留该行。查看修改后的问题。
  • 空列只包含null,没有""字符串
  • @Kaushal 更新了一个可行的答案,但可能不是最好的解决方案。
  • 是的,这是实现这一目标的方法,但我想我们可以通过单次随机播放来实现这一目标。我的意思是我可以为同一个自定义不同的 UDAF
猜你喜欢
  • 2016-03-07
  • 1970-01-01
  • 2013-01-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-12
  • 2019-03-25
  • 1970-01-01
相关资源
最近更新 更多