【发布时间】:2017-06-13 17:33:42
【问题描述】:
所以我读了这个答案Spark: how to get the number of written rows? 和How to get the number of records written (using DataFrameWriter's save operation)?,它真的很有帮助,对我的意见很有用。
但是出于某种原因,即使我在代码中多次写入 parquet,它也总是没有输出。 (taskEnd.taskMetrics.outputMetrics= None) 始终没有。
添加了带有 accumulables 的示例代码,但输出仍然没有为我提供输入行的正确结果,它似乎对该 accumulable 正常工作。
我使用的是 Scala 和 Spark 1.6。
我有 2 个问题。
如何使用 spark 1.6 解决此问题
使用较新版本的 spark 是否可以正常工作。
附上我在 Spark 1.6 中的日志
var sc = new SparkContext(sparkConf)
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
if (taskEnd.taskInfo.accumulables.nonEmpty) {
for (i <- 0 until 6) {
println()
if (taskEnd.taskInfo.accumulables.length > i) {
println("value of i " + i)
println("name = " + taskEnd.taskInfo.accumulables(i).name)
println("value = " + taskEnd.taskInfo.accumulables(i).value)
}
if (taskEnd.taskMetrics.inputMetrics != None) {
println("input records " + taskEnd.taskMetrics.inputMetrics.get.recordsRead)
inputRecords += taskEnd.taskMetrics.inputMetrics.get.recordsRead
}
else {
println("task input records are empty")
}
}
}
})
这就是我写实木复合地板的方式。我不使用 savesAsTable 而不是 .parquet 我是否需要使用 .savesAsTable 来记录输出更改。我正在使用 Databricks csv 读取我的数据框
df_esd.write.mode("append")
.partitionBy("dt_skey")
.parquet(esd_hdfs_loc)
非常感谢任何帮助。
更新了运行上述代码后输出的一些图片。 通过累加器运行的内部循环的示例输出
从这 2 张图片中可以看出,写入的行日志信息量不是很大,但其他累积值信息量更大。 事实上,它只是增加了一个写的行,这没有任何意义,因为我正在写数百万条记录并在下一个记录中注明,它只为写的行打印了 8 个。
但是在运行它的代码结束时,我得到了这个。
当我在数据库中验证这是否是写入的行数时。
是一样的 对我来说,最后一个数字似乎是写入的行数。即使它不被称为。只是说行数。 同样在代码的末尾,只有一行计数,而不是其他 5 个可累加项。 只有那 1 个。 谢谢
【问题讨论】:
标签: scala apache-spark apache-spark-sql