【问题标题】:Updating a column of a dataframe based on another column value根据另一列值更新数据框的列
【发布时间】:2017-11-11 06:29:31
【问题描述】:

我正在尝试使用 Scala 中另一列的值来更新列的值。

这是我的数据框中的数据:

+-------------------+------+------+-----+------+----+--------------------+-----------+
|UniqueRowIdentifier|   _c0|   _c1|  _c2|   _c3| _c4|                 _c5|isBadRecord|
+-------------------+------+------+-----+------+----+--------------------+-----------+
|                  1|     0|     0| Name|     0|Desc|                    |          0|
|                  2|  2.11| 10000|Juice|     0| XYZ|2016/12/31 : Inco...|          0|
|                  3|-0.500|-24.12|Fruit|  -255| ABC| 1994-11-21 00:00:00|          0|
|                  4| 0.087|  1222|Bread|-22.06|    | 2017-02-14 00:00:00|          0|
|                  5| 0.087|  1222|Bread|-22.06|    |                    |          0|
+-------------------+------+------+-----+------+----+--------------------+-----------+

这里的列 _c5 包含一个不正确的值(Row2 中的值具有字符串 Incorrect),基于此我想将其 isBadRecord 字段更新为 1。

有没有办法更新这个字段?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    您可以使用withColumn api 并使用满足您需要的functions 之一为不良记录填写1。

    对于您的情况,您可以编写 udf 函数

    def fillbad = udf((c5 : String) => if(c5.contains("Incorrect")) 1 else 0)
    

    并将其称为

    val newDF = dataframe.withColumn("isBadRecord", fillbad(dataframe("_c5")))
    

    【讨论】:

    • 如何使用 withColumn api 检查一列的值并根据该值更新另一列?
    • 更新我的答案请检查
    • 与其创建一个UDF来检查一个简单的'contains',不如在withColumn本身中使用'contains'!
    • 我已经用过很多次了。 df.withColumn("isBadRecord", when(col("_c5").contains("Incorrect"),1).otherwise(0))
    • @AvikAggarwal 而不是对有效答案发表评论和投票,为什么不用另一个答案来回答这个问题。我再次感谢您对一个有效的答案投反对票
    【解决方案2】:

    我建议您像在 SQL 中那样考虑它,而不是考虑更新它;您可以执行以下操作:

    import org.spark.sql.functions.when
    
    val spark: SparkSession = ??? // your spark session
    val df: DataFrame = ??? // your dataframe
    
    import spark.implicits._
    
    df.select(
      $"UniqueRowIdentifier", $"_c0", $"_c1", $"_c2", $"_c3", $"_c4",
      $"_c5", when($"_c5".contains("Incorrect"), 1).otherwise(0) as "isBadRecord")
    

    这是一个独立的脚本,您可以将其复制并粘贴到 Spark shell 上以在本地查看结果:

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    
    sc.setLogLevel("ERROR")
    
    val schema = 
      StructType(Seq(
        StructField("UniqueRowIdentifier", IntegerType),
        StructField("_c0", DoubleType),
        StructField("_c1", DoubleType),
        StructField("_c2", StringType),
        StructField("_c3", DoubleType),
        StructField("_c4", StringType),
        StructField("_c5", StringType),
        StructField("isBadRecord", IntegerType)))
    
    val contents =
      Seq(
        Row(1,  0.0  ,     0.0 ,  "Name",    0.0, "Desc",                       "", 0),
        Row(2,  2.11 , 10000.0 , "Juice",    0.0,  "XYZ", "2016/12/31 : Incorrect", 0),
        Row(3, -0.5  ,   -24.12, "Fruit", -255.0,  "ABC",    "1994-11-21 00:00:00", 0),
        Row(4,  0.087,  1222.0 , "Bread",  -22.06,    "",    "2017-02-14 00:00:00", 0),
        Row(5,  0.087,  1222.0 , "Bread",  -22.06,    "",                       "", 0)
      )
    
    val df = spark.createDataFrame(sc.parallelize(contents), schema)
    
    df.show()
    
    val withBadRecords =
      df.select(
        $"UniqueRowIdentifier", $"_c0", $"_c1", $"_c2", $"_c3", $"_c4",
        $"_c5", when($"_c5".contains("Incorrect"), 1).otherwise(0) as "isBadRecord")
    
    withBadRecords.show()
    

    其相关输出如下:

    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    |UniqueRowIdentifier|  _c0|    _c1|  _c2|   _c3| _c4|                 _c5|isBadRecord|
    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    |                  1|  0.0|    0.0| Name|   0.0|Desc|                    |          0|
    |                  2| 2.11|10000.0|Juice|   0.0| XYZ|2016/12/31 : Inco...|          0|
    |                  3| -0.5| -24.12|Fruit|-255.0| ABC| 1994-11-21 00:00:00|          0|
    |                  4|0.087| 1222.0|Bread|-22.06|    | 2017-02-14 00:00:00|          0|
    |                  5|0.087| 1222.0|Bread|-22.06|    |                    |          0|
    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    
    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    |UniqueRowIdentifier|  _c0|    _c1|  _c2|   _c3| _c4|                 _c5|isBadRecord|
    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    |                  1|  0.0|    0.0| Name|   0.0|Desc|                    |          0|
    |                  2| 2.11|10000.0|Juice|   0.0| XYZ|2016/12/31 : Inco...|          1|
    |                  3| -0.5| -24.12|Fruit|-255.0| ABC| 1994-11-21 00:00:00|          0|
    |                  4|0.087| 1222.0|Bread|-22.06|    | 2017-02-14 00:00:00|          0|
    |                  5|0.087| 1222.0|Bread|-22.06|    |                    |          0|
    +-------------------+-----+-------+-----+------+----+--------------------+-----------+
    

    【讨论】:

    • 如果_c5 具有2016/12/31 : Incorrect data 作为值,答案是否仍然有效?我猜不是。
    • 你说得对,我修正了我的答案以反映原始问题中的要求。
    【解决方案3】:

    最好的选择是创建一个 UDF 并尝试将其转换为日期格式。 如果可以转换则返回 0 否则返回 1

    即使您的日期格式不正确也可以使用

          val spark = SparkSession.builder().master("local")
            .appName("test").getOrCreate()
    
          import spark.implicits._
    
    //create test dataframe
          val data = spark.sparkContext.parallelize(Seq(
            (1,"1994-11-21 Xyz"),
            (2,"1994-11-21 00:00:00"),
            (3,"1994-11-21 00:00:00")
          )).toDF("id", "date")
    
    // create udf which tries to convert to date format
    // returns 0 if success and returns 1 if failure 
          val check = udf((value: String) => {
            Try(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(value)) match {
              case Success(d) => 1
              case Failure(e) => 0
            }
          })
    
    // Add column 
          data.withColumn("badData", check($"date")).show
    

    希望这会有所帮助!

    【讨论】:

    • 如果字段值中的单词与不正确的单词不同,上述答案是否有效?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-26
    • 2019-11-23
    • 1970-01-01
    • 2019-10-14
    • 1970-01-01
    • 2020-05-28
    相关资源
    最近更新 更多