【问题标题】:Create new pyspark DataFrame column by concatenating values of another column based on a conditional通过基于条件连接另一列的值来创建新的 pyspark DataFrame 列
【发布时间】:2018-05-09 18:38:54
【问题描述】:

我在pyspark 中有一个数据框,如下所示

df.show()

+-------+--------------------+--------------------+
| Dev_No|               model|              Tested|
+-------+--------------------+--------------------+
|BTA16C5|          Windows PC|                   N|
|BTA16C5|                 SRL|                   N|
|BTA16C5|     Hewlett Packard|                   N|
|CTA16C5|     Android Devices|                   Y|
|CTA16C5|     Hewlett Packard|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|               Other|                   N|
|4MY16A5|              Tablet|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|           Cable STB|                   Y|
|4MY16A5|               Other|                   N|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|          Windows PC|                   Y|
|4MY16A5|         Smart Watch|                   Y|
+-------+--------------------+--------------------+

现在使用上面的数据框,我想用一个名为Tested_devicesnewcolumn 创建下面的数据框,并用每个Dev_No 选择model 的值填充列,其中TestedY并将所有值填充为逗号分隔。

df1.show()

+-------+--------------------+--------------------+------------------------------------------------------+
| Dev_No|               model|              Tested|                                        Tested_devices|
+-------+--------------------+--------------------+------------------------------------------------------+
|BTA16C5|          Windows PC|                   N|                                                      |
|BTA16C5|                 SRL|                   N|                                                      |  
|BTA16C5|     Hewlett Packard|                   N|                                                      |
|CTA16C5|     Android Devices|                   Y|                                       Android Devices|
|CTA16C5|     Hewlett Packard|                   N|                                                      |      
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|              Tablet|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch| 
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|           Cable STB|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|               Other|                   N|                                                      |
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|          Windows PC|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
|4MY16A5|         Smart Watch|                   Y| Tablet, Cable STB,Windows PC, Windows PC, Smart Watch|
+-------+--------------------+--------------------+------------------------------------------------------+

我尝试了类似下面的方法来选择Dev_Nomodel 其中TestedY

a = df.select("Dev_No", "model"), when(df.Tested == 'Y')

我无法得到结果。它给了我以下错误

TypeError: when() takes exactly 2 arguments (1 given)

我怎样才能达到我想要的效果

【问题讨论】:

  • 是否需要 Dev_No 在输出中重复?一个更简单的解决方案可能是仅过滤到经过测试的设备,然后在 Dev_No 上分组并将模型收集为一个列表:
  • 您尝试的错误是您应该使用 where 过滤数据框:a = df.where(df.Tested == 'Y').select("Dev_No", "model")

标签: apache-spark pyspark spark-dataframe


【解决方案1】:

更新

对于 spark 1.6,您将需要一种替代方法。在不使用udf 或任何Window 函数的情况下执行此操作的一种方法是使用收集的值创建第二个临时DataFrame,然后将其连接回原始DataFrame。

两个 Dev_NoTested 的第一组,并使用 concat_wscollect_list 聚合。聚合后,仅过滤已测试设备的 DataFrame。

import pyspark.sql.functions as f

# create temporary DataFrame
df2 = df.groupBy('Dev_No', 'Tested')\
    .agg(f.concat_ws(", ", f.collect_list('model')).alias('Tested_devices'))\
    .where(f.col('Tested') == 'Y')

df2.show(truncate=False)
#+-------+------+------------------------------------------------------+
#|Dev_No |Tested|Tested_devices                                        |
#+-------+------+------------------------------------------------------+
#|CTA16C5|Y     |Android Devices                                       |
#|4MY16A5|Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+------+------------------------------------------------------+

现在使用Dev_NoTested 列作为连接键,对dfdf2 进行左连接:

df.join(df2, on=['Dev_No', 'Tested'], how='left')\
    .select('Dev_No', 'model', 'Tested', 'Tested_devices')\
    .show(truncate=False)

在末尾使用select 的目的是为了显示与原始DataFrame 相同的顺序获取列-如果您选择,可以删除此步骤。

这将导致以下输出(与以下输出相同(带有concat_ws):

#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|4MY16A5|Other          |N     |null                                                  |
#|CTA16C5|Hewlett Packard|N     |null                                                  |
#|BTA16C5|Windows PC     |N     |null                                                  |
#|BTA16C5|SRL            |N     |null                                                  |
#|BTA16C5|Hewlett Packard|N     |null                                                  |
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#+-------+---------------+------+------------------------------------------------------+

原答案:(适用于更高版本的 Spark)

您可以通过使用两个 pyspark.sql.functions.when() 语句来实现这一点 - 其中一个在通过 Window 调用 pyspark.sql.functions.collect_list() 中,利用默认 nulldoes not get added to the list 的事实:

from pyspark.sql import Window
import pyspark.sql.functions as f

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.collect_list(
            f.when(
                f.col("Tested") == "Y",
                f.col('model')
            )
        ).over(Window.partitionBy("Dev_No"))
    ).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+--------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                          |
#+-------+---------------+------+--------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |null                                                    |
#|BTA16C5|SRL            |N     |null                                                    |
#|BTA16C5|Hewlett Packard|N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Tablet         |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Cable STB      |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Other          |N     |null                                                    |
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Windows PC     |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|4MY16A5|Smart Watch    |Y     |[Tablet, Cable STB, Windows PC, Windows PC, Smart Watch]|
#|CTA16C5|Android Devices|Y     |[Android Devices]                                       |
#|CTA16C5|Hewlett Packard|N     |null                                                    |
#+-------+---------------+------+--------------------------------------------------------+

如果您希望您的输出与您在问题中显示的完全一样 - 作为逗号分隔值的字符串而不是列表和空字符串而不是 null- 您可以稍微修改如下:

使用pyspark.sql.functions.concat_wscollect_list 的输出连接成一个字符串。我使用", " 作为分隔符。这相当于在 python 中做", ".join(some_list)。接下来,我们将.otherwise(f.lit("")) 添加到外部when() 调用的末尾,以指定如果条件为False,我们希望返回一个文字空字符串。

df.select(
    "*",
    f.when(
        f.col("Tested") == "Y",
        f.concat_ws(
            ", ",
            f.collect_list(
                f.when(
                    f.col("Tested") == "Y",
                    f.col('model')
                )
            ).over(Window.partitionBy("Dev_No"))
        )
    ).otherwise(f.lit("")).alias("Tested_devices")
).show(truncate=False)
#+-------+---------------+------+------------------------------------------------------+
#|Dev_No |model          |Tested|Tested_devices                                        |
#+-------+---------------+------+------------------------------------------------------+
#|BTA16C5|Windows PC     |N     |                                                      |
#|BTA16C5|SRL            |N     |                                                      |
#|BTA16C5|Hewlett Packard|N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Other          |N     |                                                      |
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
#|CTA16C5|Android Devices|Y     |Android Devices                                       |
#|CTA16C5|Hewlett Packard|N     |                                                      |
#+-------+---------------+------+------------------------------------------------------+

使用pyspark-sql语法,上面第一个例子等价于:

df.registerTempTable("df")
query = """
 SELECT *, 
        CASE 
          WHEN Tested = 'Y' 
          THEN COLLECT_LIST(
            CASE 
              WHEN Tested = 'Y' 
              THEN model
            END
          ) OVER (PARTITION BY Dev_No) 
        END AS Tested_devices
   FROM df
"""
sqlCtx.sql(query).show(truncate=False)

【讨论】:

  • 我收到java.lang.UnsupportedOperationException: 'collect_list(CASE WHEN ('Tested = Y) THEN 'model) is not supported in a window operation 错误
  • 什么版本的火花?
  • 我的版本是 spark 1.6
  • @user9367133 我在顶部做了一个更新,应该适用于 1.6
  • 如果要从列表中删除重复项,还有pyspark.sql.functions.collect_set()
【解决方案2】:

为了清晰和解释而发表评论

pyspark > 1.6

#window function to group by Dev_No
from pyspark.sql import Window
windowSpec = Window.partitionBy("Dev_No")

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
@f.udf(t.StringType())
def populatedUdfFunc(tested, list):
    if(tested == "Y"):
        return ", ".join(list)
    else:
        return ""
#collecting models when Tested is Y using window function defined above
df.withColumn("Tested_devices", populatedUdfFunc(f.col("Tested"), f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None)).over(windowSpec))).show(truncate=False)

这应该给你

+-------+---------------+------+------------------------------------------------------+
|Dev_No |model          |Tested|Tested_devices                                        |
+-------+---------------+------+------------------------------------------------------+
|BTA16C5|Windows PC     |N     |                                                      |
|BTA16C5|SRL            |N     |                                                      |
|BTA16C5|Hewlett Packard|N     |                                                      |
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Tablet         |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Cable STB      |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Other          |N     |                                                      |
|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Windows PC     |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|4MY16A5|Smart Watch    |Y     |Tablet, Cable STB, Windows PC, Windows PC, Smart Watch|
|CTA16C5|Android Devices|Y     |Android Devices                                       |
|CTA16C5|Hewlett Packard|N     |                                                      |
+-------+---------------+------+------------------------------------------------------+

火花 = 1.6

对于 pyspark 1.6,collect_list 不能与 window 函数一起使用,并且 SqlContext 中没有定义 collect_list 函数。所以你将不得不不使用窗口函数并使用 HiveContext 而不是 SQLContext

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function to change the collected list to string and also to check if Tested column is Y or N
def populatedUdfFunc(list):
    return ", ".join(list)

populateUdf = f.udf(populatedUdfFunc, t.StringType())

#collecting models when Tested is Y using window function defined above
tempdf = df.groupBy("Dev_No").agg(populateUdf(f.collect_list(f.when(f.col("Tested") == "Y", f.col("model")).otherwise(None))).alias("Tested_devices"))
df.join(
    tempdf,
    (df["Dev_No"] == tempdf["Dev_No"]) & (df["Tested"] == f.lit("Y")), "left").show(truncate=False)

你会得到和上面一样的输出

【讨论】:

    猜你喜欢
    • 2020-10-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-22
    • 2018-03-16
    • 1970-01-01
    • 1970-01-01
    • 2017-05-20
    相关资源
    最近更新 更多