【问题标题】:Get value from Pyspark Column and compare it to a Python dictionary从 Pyspark Column 获取值并将其与 Python 字典进行比较
【发布时间】:2021-04-23 13:48:45
【问题描述】:

所以我有一个 pyspark 数据框,我想添加另一列以使用 Section_1 列中的值并在 python 字典中找到其对应的值。所以基本上使用 Section_1 单元格中的值作为键,然后在新列中填写 python 字典中的值,如下所示。

原始数据框

DataId ObjId Name Object Section_1
My data Data name Object name rd.111 rd.123

Python 字典

object_map= {'rd.123' : 'rd.567'}

第 1 节的值为 rd.123,我将在字典中搜索键 'rd.123' 并希望返回 rd.567 的值并将其放入新列中

所需的数据帧

DataId ObjId Name Object Section_1 Section_2
My data Data name Object name rd.111 rd.123 rd.567

现在我的当前代码出现了这个错误,我真的不知道我做错了什么,因为我不熟悉 pyspark

您的代码中对 Column 对象的调用不正确。请 检查您的代码。

这是我目前使用的代码,其中 object_map 是 python 字典。

test_df = output.withColumn('Section_2', object_map.get(output.Section_1.collect()))

【问题讨论】:

  • 您可以尝试output.Section_1.collect()[0][0],但这仅适用于您的数据框只有 1 行
  • 感谢您的建议,但我有多个行。
  • 试试this answer?
  • 这可能已经完成了,但我发现有一个空值并给出了这个错误
  • 不能使用 null 作为映射键。

标签: python apache-spark dictionary pyspark apache-spark-sql


【解决方案1】:

你可以试试这个(改编自 this answer 并添加了 null 处理):

from itertools import chain
from pyspark.sql.functions import create_map, lit, when

object_map = {'rd.123': 'rd.567'}
mapping_expr = create_map([lit(x) for x in chain(*object_map.items())])

df1 = df.filter(df['Section_1'].isNull()).withColumn('Section_2', F.lit(None))

df2 = df.filter(df['Section_1'].isNotNull()).withColumn(
    'Section_2', 
    when(
        df['Section_1'].isNotNull(), 
        mapping_expr[df['Section_1']]
    )
)

result = df1.unionAll(df2)

【讨论】:

  • 我认为这非常接近,但仍然给我带来了空值问题
  • 如果您有空条目,则无法使用字典将其映射到任何内容。
  • 那么我的代码应该已经这样做了。否则默认为无。
  • 避免使用df = df.withColumn(...)。为每个转换后的数据帧使用一个新变量。
  • 抱歉,您可以试试编辑后的答案吗?
猜你喜欢
  • 2013-08-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-21
相关资源
最近更新 更多