【发布时间】:2018-04-22 00:02:52
【问题描述】:
我有一个用例,我需要对已加载的数据进行一些数据更正。鉴于元数据位于 hbase 中,其中一个 colfamily 下的逻辑主键和另一个 column family 下的 columnUpdates。假设我有一个过滤后的数据框,其中包含一条记录,我需要对其进行更新(使用 spark sql 过滤)。colNames 和 Colvalues 在 java Map 中。我知道我们可以应用 withColunmm 来更新或添加一个新的列到现有的 DF,但在这种情况下,我需要根据我的元数据多次应用 withColumn,即需要更正数据的列的数量。我不能通过迭代在 for 循环中执行此操作map 因为 Dataframes 是不可变的,我也不鼓励使用 switch case。还有一个限制,不应该使用 scala API。
Dataset<Row> existingdata = sparksession.read
.format(com.databricks.spark.avro)
.load(myhdfslocation);
Map<byte[],byte[]> colUpdates = result.getFamily("TK")//result of hbase get
Set<byte[]> colUpdateKeys = colUpdates.keySet();
for(byte[] eachkey : colUpdateKeys ){
Dataset<Row> updatedDF =
existingdata.withColumn(
existingdata.col(Bytes.toString(eachkey)),
"value from themetadatamap"
);
}
到目前为止,我有两种方法,一种是使用 switch case(这不是最佳的,因为它不是保留这么多 switch case 的好方法),另一种是将 hbase 元数据作为 spark 数据帧读取,然后应用火花连接以获得结果数据集。 如果有人可以建议实现此用例的最佳方法,那将非常有帮助。:)
【问题讨论】:
-
"value from themetadatamap"是否来自同一个existingdata数据框? -
不。它是来自 colupdates 映射的值。此映射具有要更新的列名作为键,值是列值。
-
Existingdata =existingdata.withColumn(.....) 如果数据帧在无法更改为新值时被初始化,则此行将引发重新初始化错误,因为数据帧是不可变的。
-
数据框是不可变的,这意味着您不能更改数据框对象。这并不意味着您不能用不同的对象替换数据框变量的对象值。
existingdata = existingdata.withColumn()会在旧数据框的基础上创建一个新的数据框,然后覆盖变量的值。 -
即使我以类似的方式思考并保留了类似的代码,但在运行时它会引发重新初始化错误..您可以通过执行类似的代码来尝试 spark shell..它是甚至不允许更改参考变量..无论如何我会再次运行并让你知道。
标签: java apache-spark apache-spark-sql apache-spark-dataset