【问题标题】:Pyspark dataframe - select rows that failsPyspark 数据框 - 选择失败的行
【发布时间】:2018-01-30 02:22:13
【问题描述】:

是否可以对数据框执行一组操作(添加新列、替换一些现有值等)并且不会在第一个失败的行上快速失败,而是执行完整的转换并单独返回已处理错误的行?

示例: 更像是伪代码,但思路一定要清楚:

df.withColumn('PRICE_AS_NUM', to_num(df["PRICE_AS_STR"]))

to_num - 是我将字符串转换为数字的自定义函数。

假设我有一些价格无法转换为数字的记录 - 我想在单独的数据框中获取这些记录。

我看到了一种方法,但它会使代码有点难看(而且效率不高): 使用 try catch 进行过滤 - 如果发生异常 - 将这些记录过滤到单独的 df.. 如果我有很多这样的转换怎么办......有更好的方法吗?

【问题讨论】:

  • 您能否提供一个小示例,其中包含一些示例输入、操作、它们为什么会失败以及所需的输出?请参阅this post
  • 你可以做一个原生的python try/except
  • @pault,添加示例。
  • @pratiklodha 我无法完成整个过程 - 它会在第一次记录时失败。我都想要。

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

我认为一种方法是使用返回布尔值的try/except 函数来包装您的转换。然后使用when()otherwise() 过滤布尔值。例如:

def to_num_wrapper(inputs):
    try:
        to_num(inputs)
        return True
    except:
        return False

from pyspark.sql.functions import when
df.withColumn('PRICE_AS_NUM',
              when(
                    to_num_wrapper(df["PRICE_AS_STR"]),
                    to_num(df["PRICE_AS_STR"])
              ).otherwise('FAILED')
)

然后您可以过滤值为'FAILED'的列。

【讨论】:

  • 是的,谢谢,我有这样的想法!不幸的是,我需要对许多列执行此操作,这些列的派生数据类型也没有多大意义......
【解决方案2】:

首选选项

总是更喜欢内置 SQL 函数而不是 UDF。执行起来很安全,而且比 Python UDF 快得多。作为奖励,他们遵循 SQL 语义 - 如果在线上有问题,输出为 NULL - 未定义。

如果你选择 UDF

遵循与内置函数相同的方法。

def safe_udf(f, dtype):
    def _(*args):
        try:
            return f(*args)
        except:
            pass
    return udf(_, dtype)

to_num_wrapper = safe_udf(lambda x: float(x), "float")

df = spark.createDataFrame([("1.123", ), ("foo", )], ["str"])

df.withColumn("num", to_num_wrapper("str")).show()
# +-----+-----+
# |  str|  num|
# +-----+-----+
# |1.123|1.123|
# |  foo| null|
# +-----+-----+

虽然吞咽异常可能是违反直觉的,但这只是遵循 SQL 约定的问题。

无论你选择哪一个

一旦您使用上述其中一项进行调整,处理格式错误的数据只需应用DataFrameNaFunctions.na.drop.na.replace)即可。

【讨论】:

    猜你喜欢
    • 2020-02-19
    • 2020-08-02
    • 2018-03-30
    • 1970-01-01
    • 2018-09-09
    • 1970-01-01
    • 2021-11-23
    • 2023-02-01
    • 1970-01-01
    相关资源
    最近更新 更多