【问题标题】:how to update spark dataframe column containing array using udf如何使用udf更新包含数组的spark数据框列
【发布时间】:2019-10-29 06:06:43
【问题描述】:

我有一个数据框:

+--------------------+------+
|people              |person|
+--------------------+------+
|[[jack, jill, hero]]|joker |
+--------------------+------+

这是架构:

root
 |-- people: struct (nullable = true)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- person: string (nullable = true)

这里,root--person 是一个字符串。所以,我可以使用 udf 更新这个字段:

def updateString = udf((s: String) => {
    "Mr. " + s
})
df.withColumn("person", updateString(col("person"))).select("person").show(false)

输出:

+---------+
|person   |
+---------+
|Mr. joker|
+---------+

我想对包含人员数组的 root--people--person 列执行相同的操作。如何使用 udf 实现这一点?

def updateArray = udf((arr: Seq[Row]) => ???
df.withColumn("people", updateArray(col("people.person"))).select("people").show(false)

预期:

+------------------------------+
|people                        |
+------------------------------+
|[Mr. hero, Mr. jack, Mr. jill]|
+------------------------------+

编辑:我还想在更新 root--people--person 后保留其架构。

人的预期模式:

df.select("people").printSchema()

root
 |-- people: struct (nullable = false)
 |    |-- person: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

谢谢,

【问题讨论】:

    标签: scala apache-spark apache-spark-sql user-defined-functions


    【解决方案1】:

    这里的问题是 people 是只有 1 个字段的结构。在您的 UDF 中,您需要返回 Tuple1,然后进一步转换您的 UDF 的输出以保持名称正确:

    def updateArray = udf((r: Row) => Tuple1(r.getAs[Seq[String]](0).map(x=>"Mr."+x)))
    
    val newDF = df
      .withColumn("people",updateArray($"people").cast("struct<person:array<string>>"))
    
    newDF.printSchema()
    newDF.show()
    

    给予

    root
     |-- people: struct (nullable = true)
     |    |-- person: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |-- person: string (nullable = true)
    
    
    +--------------------+------+
    |              people|person|
    +--------------------+------+
    |[[Mr.jack, Mr.jil...| joker|
    +--------------------+------+
    

    【讨论】:

    • 你也可以在 PySpark 中回答吗?
    【解决方案2】:

    因为您只需要更新您的功能,一切都保持不变。 这是代码sn-p。

    scala> df2.show
    +------+------------------+
    |people|            person|
    +------+------------------+
    | joker|[jack, jill, hero]|
    +------+------------------+
    //jus order is changed
    I just updated your function instead of using Row I am using here Seq[String]
    
    scala> def updateArray = udf((arr: Seq[String]) => arr.map(x=>"Mr."+x))
    scala> df2.withColumn("test",updateArray($"person")).show(false)
    +------+------------------+---------------------------+
    |people|person            |test                       |
    +------+------------------+---------------------------+
    |joker |[jack, jill, hero]|[Mr.jack, Mr.jill, Mr.hero]|
    +------+------------------+---------------------------+
    //keep all the column for testing purpose you could drop if you dont want.
    

    如果您想了解更多信息,请告诉我。

    【讨论】:

    • 谢谢@Mahesh,这很有魅力,但我也想保留它的模式。我已经更新了这个问题。能否请您查看并更新答案。
    • 您的输入是 [jack, jill, hero] 并且您希望输出为 [Mr.英雄,杰克先生,吉尔先生] 对吗?
    【解决方案3】:

    让我们为测试创建数据

    scala> val data = Seq((List(Array("ja", "ji", "he")), "person")).toDF("people", "person")
    data: org.apache.spark.sql.DataFrame = [people: array<array<string>>, person: string]
    
    scala> data.printSchema
    root
     |-- people: array (nullable = true)
     |    |-- element: array (containsNull = true)
     |    |    |-- element: string (containsNull = true)
     |-- person: string (nullable = true)
    

    根据我们的要求创建 UDF

    scala> def arrayConcat(array:Seq[Seq[String]], str: String) = array.map(_.map(str + _))
    arrayConcat: (array: Seq[Seq[String]], str: String)Seq[Seq[String]]
    
    scala> val arrayConcatUDF = udf(arrayConcat _)
    arrayConcatUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true), StringType)))
    

    应用 udf

    scala> data.withColumn("dasd", arrayConcatUDF($"people", lit("Mr."))).show(false)
    +--------------------------+------+-----------------------------------+
    |people                    |person|dasd                               |
    +--------------------------+------+-----------------------------------+
    |[WrappedArray(ja, ji, he)]|person|[WrappedArray(Mr.ja, Mr.ji, Mr.he)]|
    +--------------------------+------+-----------------------------------+
    

    您可能需要稍作调整(我认为几乎不需要任何调整),但这包含了解决您的问题的大部分内容

    【讨论】:

    • 这不是正确的架构(您的输入数据)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-09
    • 2018-02-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-20
    相关资源
    最近更新 更多