【问题标题】:PySpark - map with lambda functionPySpark - 带有 lambda 函数的映射
【发布时间】:2019-06-24 14:45:57
【问题描述】:

在 Spark 环境中混合 python map 和 lambda 函数时遇到问题。

给定 df1,我的源数据框:

Animals     | Food      | Home
----------------------------------
Monkey      | Banana    | Jungle
Dog         | Meat      | Garden
Cat         | Fish      | House
Elephant    | Banana    | Jungle
Lion        | Meat      | Desert

我想创建另一个数据框 df2。它将包含两列,每列 df1 一行(在我的示例中为 3)。 第一列将包含 df1 列的名称。第二列将包含出现次数最多的元素数组(在下面的示例中为 n=3)和计数。

Column      | Content
-----------------------------------------------------------
Animals     | [("Cat", 1), ("Dog", 1), ("Elephant", 1)]
Food        | [("Banana", 2), ("Meat", 2), ("Fish", 1)]
Home        | [("Jungle", 2), ("Desert", 1), ("Garden", 1)]

我尝试使用 python 列表、映射和 lambda 函数来实现,但与 PySpark 函数发生冲突:

def transform(df1):
    # Number of entry to keep per row
    n = 3
    # Add a column for the count of occurence
    df1 = df1.withColumn("future_occurences", F.lit(1))

    df2 = df1.withColumn("Content",
        F.array(
            F.create_map(
                lambda x: (x,
                    [
                        str(row[x]) for row in df1.groupBy(x).agg(
                            F.sum("future_occurences").alias("occurences")
                        ).orderBy(
                            F.desc("occurences")
                        ).select(x).limit(n).collect()
                    ]
                ), df1.columns
            )
        )
    )
    return df2

错误是:

TypeError: Invalid argument, not a string or column: <function <lambda> at 0x7fc844430410> of type <type 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

知道怎么解决吗?

非常感谢!

【问题讨论】:

  • 这可以做到,但这并不是 spark 设计的真正问题类型。您可以独立处理每一列并union 结果。你如何断绝关系?为什么是CatDogElephant,而其他两个动物的计数也是1?
  • @PentaKill 我更喜欢发布我的代码来说明我面临的问题。我不明白你为什么说它没用。
  • @pault 感谢您的评论。我是新手,所以我仍然需要学习。是的,我想我可以独立处理列,但我不确定这是最好的解决方案。我打破了字母顺序的联系。这就是我没有展示狮子和猴子的原因。

标签: python pandas apache-spark lambda pyspark


【解决方案1】:

这是一种可能的解决方案,其中Content 列将是StructType 的数组,其中包含两个命名字段:Contentcount

from pyspark.sql.functions import col, collect_list, desc, lit, struct
from functools import reduce 

def transform(df, n):
    return reduce(
        lambda a, b: a.unionAll(b),
        (
            df.groupBy(c).count()\
                .orderBy(desc("count"), c)\
                .limit(n)\
                .withColumn("Column", lit(c))\
                .groupBy("Column")\
                .agg(
                    collect_list(
                        struct(
                            col(c).cast("string").alias("Content"), 
                            "count")
                    ).alias("Content")
                )
            for c in df.columns
        )
    )

此函数将遍历输入 DataFrame df 中的每一列,并计算每个值的出现次数。然后我们orderBy 计数(降序)和它自己的列值(按字母顺序)并只保留第一行nlimit(n))。

接下来,将值收集到一个结构数组中,最后union 将每一列的结果汇总在一起。由于union 要求每个 DataFrame 具有相同的架构,因此您需要将列值转换为字符串。

n = 3
df1 = transform(df, n)
df1.show(truncate=False)
#+-------+------------------------------------+
#|Column |Content                             |
#+-------+------------------------------------+
#|Animals|[[Cat,1], [Dog,1], [Elephant,1]]    |
#|Food   |[[Banana,2], [Meat,2], [Fish,1]]    |
#|Home   |[[Jungle,2], [Desert,1], [Garden,1]]|
#+-------+------------------------------------+

这与您要求的完全输出不同,但它可能足以满足您的需求。 (Spark 没有您描述的元组。)这是新模式:

df1.printSchema()
#root
# |-- Column: string (nullable = false)
# |-- Content: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- Content: string (nullable = true)
# |    |    |-- count: long (nullable = false)

【讨论】:

  • 感谢您的解决方案,它似乎完全满足了我的需求。但是它会导致错误Union can only be performed on tables with the compatible column types. array&lt;struct&lt;Content:boolean,count:bigint&gt;&gt; &lt;&gt; array&lt;struct&lt;Content:string,count:bigint&gt;&gt; at the second column of the 2th table。我不明白布尔类型的来源。
  • 太棒了!多谢!唯一的问题是我有Animals|[[Content: Cat, count: 1], [Content: Dog, count: 1], [Content; Elephant, count: 1]] 是否可以删除结构中的标题?即使我删除了别名,仍然有一个标题。
  • @Maxbester 您可以将struct 更改为array(在from pyspark.sql.functions import array 之后),这将为您留下WrappedArray。我不确定为什么这对您很重要 - 最终目标是什么?
  • 好的,我会试试的。抱歉,我以为我在最初的问题中解释了目标。实际上,目的是验证数据集的创建。我想确保数据在每一列中都是相关的。显然我的数据集比我举的例子要大得多。
猜你喜欢
  • 2015-06-01
  • 1970-01-01
  • 1970-01-01
  • 2021-03-28
  • 2018-12-16
  • 1970-01-01
  • 1970-01-01
  • 2022-07-14
  • 2019-12-27
相关资源
最近更新 更多