【问题标题】:Data Type validation in pysparkpyspark 中的数据类型验证
【发布时间】:2019-01-23 16:30:06
【问题描述】:

我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有未与架构确认的数据。

stringDf = sparkSession.createDataFrame(
    [
        ("11/25/1991","1"),
        ("11/24/1991", None),
        ("11/30/1991","a")
    ], 
    ['dateAsString','intAsString']
)

这是我的stringDf,有两列。

+------------+-----------+
|dateAsString|intAsString|
+------------+-----------+
|  11/25/1991|          1|
|  11/24/1991|       null|
|  11/30/1991|          a|
+------------+-----------+

我想为数据框创建一个名为dataTypeValidationErrors 的新列,以捕获此数据集中可能存在的所有错误。使用 pyspark 实现这一目标的最佳方法是什么?

+------------+-----------+------------------------+
|dateAsString|intAsString|dataTypeValidationErrors|
+------------+-----------+------------------------+
|  11/25/1991|          1|None                    |
|  11/24/1991|       null|None                    |
|  11/30/1991|          a|Not a valid Number      |
+------------+-----------+------------------------+

【问题讨论】:

    标签: pyspark pyspark-sql


    【解决方案1】:

    您可以尝试将cast 列设置为所需的DataType。如果存在不匹配或错误,将返回null。在这些情况下,您需要验证原始值不是 null,如果不是,则说明有错误。

    • 使用pyspark.sql.functions.when() 来测试转换后的列是否为null 并且原始值不是null
    • 如果这是True,则使用字符串文字"Not a valid Number" 作为列值。否则返回字符串"None"

    例如:

    import pyspark.sql.functions as f
    
    stringDf.withColumn(
            "dataTypeValidationErrors",
            f.when(
                f.col("intAsString").cast("int").isNull() & f.col("intAsString").isNotNull(),
                f.lit("Not a valid Number")
            ).otherwise(f.lit("None"))
        )\
        .show()
    #+------------+-----------+------------------------+
    #|dateAsString|intAsString|dataTypeValidationErrors|
    #+------------+-----------+------------------------+
    #|  11/25/1991|          1|                    None|
    #|  11/24/1991|       null|                    None|
    #|  11/30/1991|          a|      Not a valid Number|
    #+------------+-----------+------------------------+
    

    您也可以将其扩展到多列:

    假设您还有一行包含无效的dateAsString 值:

    stringDf = spark.createDataFrame(
        [
            ("11/25/1991","1"),
            ("11/24/1991", None),
            ("11/30/1991","a"),
            ("13.14.15", "b")
        ], 
        ['dateAsString','intAsString']
    )
    

    使用字典定义每一列的转换:

    conversions = {
        'dateAsString':lambda c: f.from_unixtime(f.unix_timestamp(c,"MM/dd/yyyy")).cast("date"),
        'intAsString':lambda c: f.col(c).cast('int')
    }
    
    stringDf.withColumn(
            "dataTypeValidationErrors",
            f.concat_ws(", ",
                *[
                    f.when(
                        v(k).isNull() & f.col(k).isNotNull(),
                        f.lit(k + " not valid")
                    ).otherwise(f.lit(None))
                    for k, v in conversions.items()
                ]
            )
        )\
        .show(truncate=False)
    #+------------+-----------+---------------------------------------------+
    #|dateAsString|intAsString|dataTypeValidationErrors                     |
    #+------------+-----------+---------------------------------------------+
    #|11/25/1991  |1          |                                             |
    #|11/24/1991  |null       |                                             |
    #|11/30/1991  |a          |intAsString not valid                        |
    #|13.14.15    |b          |dateAsString not valid, intAsString not valid|
    #+------------+-----------+---------------------------------------------+
    

    或者如果你只是想知道一行是否有错误,而不需要知道细节:

    stringDf.withColumn(
            "dataTypeValidationErrors",
            f.when(
                reduce(
                    lambda a, b: a|b,
                    (v(k).isNull() & f.col(k).isNotNull() for k, v in conversions.items())
                ),
                f.lit("Validation Error")
            ).otherwise(f.lit("None"))     
        )\
        .show(truncate=False)
    #+------------+-----------+------------------------+
    #|dateAsString|intAsString|dataTypeValidationErrors|
    #+------------+-----------+------------------------+
    #|11/25/1991  |1          |None                    |
    #|11/24/1991  |null       |None                    |
    #|11/30/1991  |a          |Validation Error        |
    #|13.14.15    |b          |Validation Error        |
    #+------------+-----------+------------------------+
    

    【讨论】:

    • 非常感谢保罗。但是,我想知道当数据框变得太宽(100 列)时是否会对性能产生影响。创建另一组列(已转换)会有什么问题吗?
    • @AnandKannan 我更新了代码以删除“temp”列,但从执行的角度来看,它实际上并没有什么不同。这不应该对性能产生任何影响,而且 100 列对 Spark 来说没什么大不了的。
    猜你喜欢
    • 2018-02-19
    • 2012-04-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多