【问题标题】:Process all columns / the entire row in a Spark UDF处理 Spark UDF 中的所有列/整行
【发布时间】:2018-08-27 00:17:24
【问题描述】:

对于包含字符串和数字数据类型混合的数据框,目标是创建一个新的features 列,它是所有这些类型的minhash

虽然这可以通过执行dataframe.toRDD 来完成,但当下一步只是将RDD 返回 转换为数据帧时,这样做的成本很高。

那么有没有办法按照以下方式进行udf

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row 当然不是spark sql 数据类型 - 所以这不会像所示的那样工作。

更新/澄清我意识到创建在withColumn 中运行的全行UDF 很容易。不太清楚的是可以在spark sql 语句中使用什么:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")

【问题讨论】:

  • 谁说Row不是spark sql。?你的 udf 函数是正确的
  • @RameshMaharjan 在spark sql 语句中如何调用“整行”?例如select wholeRowUdf( what goes here?? ) from ..
  • 只需使用 struct 内置函数将所有列合二为一并传递给 udf 函数。就这么简单。
  • @RameshMaharjan 随时回答说明您的观点。你在 cmets 中表达的方式我不知道你是在暗示“这很明显 - 你怎么看不到这个”或“这是一种简单的方法”。
  • 我已经在下面回答了:) 请检查

标签: scala apache-spark apache-spark-sql


【解决方案1】:

Row 当然不是 spark sql 数据类型 - 所以这不会如图所示。

我将展示您可以使用 Row 将所有列或选定的列传递给使用 struct 内置函数的 udf 函数

首先我定义一个dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

然后我定义 一个函数,将一行中的所有元素作为一个字符串,由, 分隔(因为你有 computeHash 函数)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

然后我在udf函数中使用它

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

最后,我使用withColumn 函数和struct 内置函数调用udf 函数 将选定的列合并为一列并传递给udf 函数

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

所以你可以看到 Row 可以用来将整行作为参数传递

您甚至可以一次传递一行中的所有列

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

更新

您也可以使用 sql 查询实现相同的目的,您只需将 udf 函数注册为

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

它会给你和上面一样的结果

现在,如果您不想硬编码列名,那么您可以根据需要选择列名并将其设为字符串

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

希望回答对你有帮助

【讨论】:

  • 我正在寻找一个支持 sql 语句 的构造。将更新 OP
  • 如果我们有两打列可能会发生变化.. 我宁愿不按照您展示的对列进行硬编码的方式进行操作:最好以某种方式“发现”列名。如果你能提供,那么我会奖励。
  • 我在之前的更新中已经给你提示过选择变量列名而不是硬编码。看看我的更新,我已经完成了你的每一个挑战。 :) 。对于我的辛勤工作,你会给我什么奖励;) 25 分?哈哈哈。如果您有进一步的挑战,请告诉我:P
  • 我的最后一条评论没有得到回复。
  • 哦,我没有注意到您已更新为包含 combineUdf 。我会投票认为可能有用。你认为你的方法比我发布的更简单吗?
【解决方案2】:

我想出了一个解决方法:将列名放到任何现有的 spark sql 函数中以生成新的输出列:

concat(${df.columns.tail.mkString(",'-',")}) as Features

在这种情况下,数据框中的第一列是目标并被排除在外。这是这种方法的另一个优点:许多列的实际列表可以被操纵。

这种方法避免了对 RDD/数据帧进行不必要的重组。

【讨论】:

    猜你喜欢
    • 2017-01-24
    • 2020-11-14
    • 2020-11-13
    • 1970-01-01
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多