【问题标题】:Counting the number of output rows Apache Spark error for outputMetrics计算 outputMetrics 的输出行数 Apache Spark 错误
【发布时间】:2017-06-13 17:33:42
【问题描述】:

所以我读了这个答案Spark: how to get the number of written rows?How to get the number of records written (using DataFrameWriter's save operation)?,它真的很有帮助,对我的意见很有用。

但是出于某种原因,即使我在代码中多次写入 parquet,它也总是没有输出。 (taskEnd.taskMetrics.outputMetrics= None) 始终没有。

添加了带有 accumulables 的示例代码,但输出仍然没有为我提供输入行的正确结果,它似乎对该 accumulable 正常工作。

我使用的是 Scala 和 Spark 1.6。

我有 2 个问题。

  1. 如何使用 spark 1.6 解决此问题

  2. 使用较新版本的 spark 是否可以正常工作。

附上我在 Spark 1.6 中的日志

var sc = new SparkContext(sparkConf)
sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      if (taskEnd.taskInfo.accumulables.nonEmpty) {

        for (i <- 0 until 6) {
          println()
          if (taskEnd.taskInfo.accumulables.length > i) {
            println("value of i " + i)
            println("name = " + taskEnd.taskInfo.accumulables(i).name)
            println("value =  " + taskEnd.taskInfo.accumulables(i).value)
          }  

      if (taskEnd.taskMetrics.inputMetrics != None) {
        println("input records " + taskEnd.taskMetrics.inputMetrics.get.recordsRead)
        inputRecords += taskEnd.taskMetrics.inputMetrics.get.recordsRead
      }
      else {
        println("task input records are empty")
      }

    }
  }
})

这就是我写实木复合地板的方式。我不使用 savesAsTable 而不是 .parquet 我是否需要使用 .savesAsTable 来记录输出更改。我正在使用 Databricks csv 读取我的数据框

df_esd.write.mode("append")
  .partitionBy("dt_skey")
  .parquet(esd_hdfs_loc)

非常感谢任何帮助。

更新了运行上述代码后输出的一些图片。 通过累加器运行的内部循环的示例输出

从这 2 张图片中可以看出,写入的行日志信息量不是很大,但其他累积值信息量更大。 事实上,它只是增加了一个写的行,这没有任何意义,因为我正在写数百万条记录并在下一个记录中注明,它只为写的行打印了 8 个。

但是在运行它的代码结束时,我得到了这个。

当我在数据库中验证这是否是写入的行数时。

是一样的 对我来说,最后一个数字似乎是写入的行数。即使它不被称为。只是说行数。 同样在代码的末尾,只有一行计数,而不是其他 5 个可累加项。 只有那 1 个。 谢谢

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    如果你看

    taskEnd.taskInfo.accumulables
    

    您会看到它与ListBuffer中的AccumulableInfo按顺序捆绑在一起。

    AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
    AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
    AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
    AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
    AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
    AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
    

    可以清楚的看到输出的行数在listBuffer的第7位,所以正确的获取写入行数的方法是

    taskEnd.taskInfo.accumulables(6).value.get
    

    【讨论】:

    • '输出行数'表示所说的内容..任务的输出行。这并不总是意味着最后的写作。
    猜你喜欢
    • 2014-08-25
    • 1970-01-01
    • 1970-01-01
    • 2018-03-13
    • 1970-01-01
    • 1970-01-01
    • 2017-06-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多