【问题标题】:Spark Accumulator value when the spark context is reading a folder with 100 files?火花上下文正在读取包含 100 个文件的文件夹时的火花累加器值?
【发布时间】:2017-12-26 06:24:02
【问题描述】:

当程序正在读取包含 100 个文件的文件夹时,Spark 程序计算累加器值,该累加器值初始化为 0,并将递增 1?

val myaccumulator = sc.accumulator(0)
val inputRDD= sc.wholeTextFiles("/path/to/100Files")
inputRDD.foreach(f => myaccumulator + f.count)

<console>:29: error: value count is not a member of (String, String)
   inputRDD.foreach(f => myaccumulator + f.count)
                                 ^

【问题讨论】:

  • scala> inputRDD.foreach(f => myAcc + 1) :29: error: type mismatch;发现:需要 Int(1):字符串 inputRDD.foreach(f => myAcc + 1)
  • 你试过myacc.add(1) 吗?
  • 是的,更改为“myacc.add(1)”可以工作,但累加器值不会随着作为参数传递给 wholeTextFiles("path/to/100/files" 的分区数而改变,20)。理想情况下,累加器值应根据分区和核心的数量而变化。但在这里它总是保持 100(因为有 100 个文件)。

标签: apache-spark apache-spark-sql accumulator


【解决方案1】:

如果您只想计算文件中的行数,则不需要任何花哨的东西。这样可以:

sc.textFile("path/to/dir/containing/the/files").count

如果你绝对想使用累加器,你可以这样做:

val myaccumulator = sc.accumulator(0)
sc.textFile("path/to/dir/containing/the/files").foreach(_ => myaccumulator += 1)

如果您绝对想使用 wholeTextFile(将每个文件的 whole 内容放在单个字符串中),以下任何一项都会计算行数:

sc.wholetextFiles("path/to/dir/containing/the/files")
    .map(_._2.split("\\n").size)
    .reduce(_+_)

或带蓄能器

val myaccumulator = sc.accumulator(0)
sc.wholeTextFiles
    .foreach(x => myaccumulator += x._2.split("\\n").size)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-30
    • 2020-04-05
    • 2017-04-02
    相关资源
    最近更新 更多