【问题标题】:How to merge edits from one dataframe into another dataframe in Spark?如何将一个数据框的编辑合并到 Spark 中的另一个数据框?
【发布时间】:2018-06-04 05:19:54
【问题描述】:

我有一个包含 150 列和多行的数据框 df1。我还有一个数据框 df2 具有相同的架构,但包含应该应用于 df1 的编辑的行很少(有一个关键列 id确定要更新的行)。 df2 仅包含填充了更新的列。其他列为空。我想要做的是用数据帧 df2 中的相应行更新 df1 中的行,方法如下:

  • 如果 df2 中的一列为空,则不应导致 df1 中的任何变化
  • 如果 df2 中的列包含波浪号“~”,则应导致 df1 中的该列无效
  • 否则 df1 列中的值应替换为 df2 中的值

我怎样才能做到最好?可以在不列出所有列而是迭代它们的情况下以通用方式完成吗?可以使用 dataframe API 完成还是需要切换到 RDD?

(当然,通过更新数据框 df1 我的意思是创建一个新的、更新的数据框。)

示例

假设架构是:id:Int, name:String, age: Int。

df1 是:

1,"Greg",18
2,"Kate",25
3,"Chris",30

df2 是:

1,"Gregory",null
2,~,26

更新后的数据框应如下所示:

1,"Gregory",18
2,null,26
3,"Chris",30

【问题讨论】:

  • 你能添加来自 df1 和 df2 的样本数据吗?这样会更容易理解。
  • 这是一个非常容易理解的问题,所以我很惊讶您需要一个示例,但我为您提供了一个。
  • 您熟悉 SQL 连接吗?
  • 是的。您可以在不列出所有 150 列的情况下使用连接吗?给我看。

标签: apache-spark dataframe updates


【解决方案1】:

您还可以使用 case 或使用全外连接合并两个数据框。请参阅下面的链接以获取说明。 Spark incremental loading overwrite old record

【讨论】:

    【解决方案2】:

    我想出了如何通过中间转换为 RDD 来做到这一点。首先,创建一个映射 idsToEdits,其中键是行 ID,值是列号到值的映射(仅限非空值)。

    val idsToEdits=df2.rdd.map{row=>
      (row(0),
       row.getValuesMap[AnyVal](row.schema.fieldNames.filterNot(colName=>row.isNullAt(row.fieldIndex(colName))))
      .map{case (k,v)=> (row.fieldIndex(k),if(v=="~") null else v)} )
    }.collectAsMap()
    

    broadast 映射并定义更新行的 editRow 函数。

    val idsToEditsBr=sc.broadcast(idsToEdits)
    import org.apache.spark.sql.Row
    val editRow:Row=>Row={ row =>
      idsToEditsBr
        .value
        .get(row(0))
        .map{edits => Row.fromSeq(edits.foldLeft(row.toSeq){case (rowSeq,
    (idx,newValue))=>rowSeq.updated(idx,newValue)})}
        .getOrElse(row)
    }
    

    最后,在从 df1 派生的 RDD 上使用该函数并转换回数据帧。

    val updatedDF=spark.createDataFrame(df1.rdd.map(editRow),df1.schema)
    

    【讨论】:

    • 以上解决了问题,但如果有人可以向我展示类似但仅使用数据框或 SQL 完成的东西,我会接受他们的回答。
    【解决方案3】:

    听起来您的问题是如何在不明确命名所有列的情况下执行此操作,因此我假设您在加入后有一些“doLogic”udf 函数或数据框函数来执行您的逻辑。

    import org.apache.spark.sql.types.StringType
    
    val cols = df1.schema.filterNot(x => x.name == "id").map({ x =>
        if (x.dataType == StringType) {
            doLogicUdf(col(x), col(x + "2"))) 
        } else {
            when(col(x + "2").isNotNull, col(x + "2")).otherwise(col(x))
        }
    }) :+ col("id")
    val df2 = df2.select(df2.columns.map( x=> col(x).alias(x+"2")) : _*)) 
    df1.join(df2, col("id") ===col("id2") , "inner").select(cols : _*) 
    

    【讨论】:

    • 有趣的方法@ayplam。我不介意遍历所有列,这实际上就是您在这里使用 map 所做的。我只是不想明确列出所有列,仅此而已。您假设 df2 中的所有列名都与 df1 中的相同是正确的。我认为我的陈述“具有相同的架构”涵盖了这一点。
    • 我对你的代码 sn-p 中的行顺序有点困惑。您不希望第一行排在最后吗?
    • 你测试过那个代码吗?我想 doLogicUdf 的底层函数应该至少接受 2 个 Any 或 AnyVal 类型的参数,因为不同的列可以是不同的类型。我不认为 Any 类型的参数在我之前尝试过 UDF 时对我有用......如果你能在不同类型的列上测试它,我将不胜感激,例如就像我提供的示例一样简单,以确认您的方法有效。如果是,我会接受它,因为这将是最优雅的解决方案。
    • 我没有机会测试代码 - 但它应该让您非常接近您想要的解决方案。第一行只是将 doLogicUdf 中 df1 和 df2 的列配对,所以只要它出现在“.select”之前就没有关系,因为什么都不执行。我也无法进行 AnyVal UDF。由于“tilda”逻辑自动创建列 StringType,因此我对其进行了轻微编辑,因此所有其他列仅使用项目符号 (1) + (3) 中的规则
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-26
    • 1970-01-01
    • 2017-10-21
    • 1970-01-01
    • 1970-01-01
    • 2016-02-03
    相关资源
    最近更新 更多