【问题标题】:Scala dataframe calling function inside withColumn?withColumn中的Scala数据框调用函数?
【发布时间】:2017-07-25 19:41:46
【问题描述】:

这就是我想要做的,我有两个表具有完全相同的列名。

表格看起来有点像这样:

-----------
A  B  C  D 
-----------
1  2  3  4
5  6  3  4
7  8  3  4

我需要解决的问题的逻辑是,将 Table1 中的 A B C D 列与 Table2 进行比较。如果A,B相互匹配,则返回一个值为0的新列,否则返回0。如果表A中的C为3,则返回0,否则返回1。每行只应返回一个值,优先级:C>D >A=B。

我加入了两个表(dataFrames),结果是一个combinedDf。这就是我加入他们的方式:Table1.join(Table2,table1($"A")=table2($"A"))

这就是我所做的:

def func(A:mutable.WrappedArray[String],B:mutable.WrappedArray[String],C:String,D:String) = 
{if(C=="3") "0";
 else if(D=="4")"1";
 else if ((0 to A.length-1).exists(i => A(i) == B(i)))"0" else "1"}

对于这个函数,我想将 Table1 中的 A、B 列放入一个数组中,将 Table2 中的 A、B 列放入另一个数组中,并运行一个 for 循环来检查相等性。 (我需要数组,因为对于实际情况,我需要比较随机数量的列)。

这是我调用函数的方式。

combinedDf.withColumn("returnVal",func(array(col("table1.A"),col("table1.B")),
array(col("table2.A"),col("table2.B")),col("table1.C"),col("table1.D")))

但它只是不起作用,即使我使用数组函数将列放在数组中,它仍然告诉我类型不匹配。

错误信息:<console:67>: error: type mismatch; found:org.apache.spark.Column required: String 提前致谢!

【问题讨论】:

  • 能否请您发送错误消息的第一部分?
  • 你是如何加入这两个数据框的?请也更新该部分。
  • @Ramesh Maharjan 已更新
  • 加入是否有效?我想这不是因为您的 join 语句完全错误。
  • 根据连接看来,两个数据框的 A 列都匹配用于连接目的。这是真的吗?

标签: scala apache-spark


【解决方案1】:

你可以试试这个,但是帮助我理解为什么你需要组合数据框,以及如果 A 和 B 匹配,你的意思是什么(我的假设是每行,对吗?),如果你的 A, B、C、D 列是字符串,然后将整数更改为字符串。

def func(A:Integer,B:Integer,C:Integer,D:Integer) = 
    {
     if(C == 3) "0"
     else if(D == 4) "1"
     else if (A == B) "0"
     else "1"
    }
    val udfFunc = udf(func _)

    combinedDf.withColumn("returnVal",
      udfFunc(col("table1.A"), col("table1.B"), 
              col("table1.C"),col("table1.D")
             )
      )

【讨论】:

  • 就像我说的我需要数组类型,因为在实际情况下我会有随机数量的列,所以我不能只在我的 udf 中定义 50 个参数。加入是因为 udf 不适用于两个数据帧,我给 udf 的任何数据都必须来自同一个数据帧
  • 然后你可以使用 concat_ws(",",Seq(col("table1.A", col("table1.B")) 作为第一个参数和类似的第二个参数。并且首先在 UDF两个参数是字符串,如果你的函数首先用逗号(,)分割列来获取数组,那么你的代码应该可以正常工作。这样你可以组合动态的列值并作为一个字符串传递,然后在您的 udf 中,您可以解析它获取数组并执行逻辑。希望对您有所帮助。
  • 还是输入错误,说找到:Seq[org.apache.spark.sql.Column] required: org.apache.spark.sql.Column
猜你喜欢
  • 2017-10-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-18
  • 2017-05-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多