【问题标题】:In Spark is counting the records in an RDD expensive task?在 Spark 中计算 RDD 昂贵任务中的记录?
【发布时间】:2016-08-11 23:03:39
【问题描述】:

在 Hadoop 中,当我使用 inputformat reader 时,作业级别的日志会报告读取了多少条记录,它还会显示字节数等。

在 Spark 中,当我使用相同的输入格式阅读器时,我没有得到这些指标。

所以我想我会使用 inputformat 阅读器来填充 rdd,然后只发布 rdd 中的记录数(rdd 的大小)。

我知道rdd.count() 返回 rdd 的大小。

但是,我不清楚使用count() 的成本?例如:

  • 是分布式函数吗?每个分区是否会报告其计数并将计数相加并报告?还是整个 rdd 都带入驱动并计算在内?
  • 执行count() 后,rdd 是否仍保留在内存中,还是必须显式缓存它?
  • 有没有更好的方法来做我想做的事,即在对记录进行操作之前对记录进行计数?

【问题讨论】:

    标签: java hadoop apache-spark


    【解决方案1】:

    是分布式函数吗?每个分区是否会报告其计数 并将计数相加并报告?还是整个rdd都带了 进入驱动程序并数数?

    计数是分布的。在火花命名法中,计数是一个“动作”。所有动作都是分布式的。确实,只有少数东西可以将所有数据带到驱动程序节点,而且它们通常都有很好的文档记录(例如获取、收集等)

    执行 count() 后,rdd 是否仍保留在内存中或执行 我必须显式缓存它?

    不,数据不会在内存中。如果你想要它,你需要在计数之前显式缓存。在采取行动之前,Spark 的惰性求值不会进行任何计算。除非有缓存调用,否则在 Action 之后不会将任何数据存储在内存中。

    有没有更好的方法来做我想做的事,即计算 在对它们进行操作之前记录?

    缓存、计数、操作似乎是一个可靠的计划

    【讨论】:

    • 其实有点搞笑:因为rdd count是分布式操作,但是对于dataset看起来不是:def count(): Long = withCallback("count", groupBy().count ()) { df => df.collect(needCallback = false).head.getLong(0) } 。知道为什么吗?
    • @KonstantinKulagin 在这种情况下它带入内存的df是一个数字为1的DF,即原始DF的行数
    • 对于计数操作,您可以使用累加器。
    • @David ,所以你说 rdd.cache().count() 会更好(或) someRDD = rdd.cache();长 numberOfRows = rdd.count()。 ,哪一个是正确的?
    • 在计算上,它们是等价的
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-04
    • 1970-01-01
    • 2013-09-16
    • 2018-03-08
    • 1970-01-01
    相关资源
    最近更新 更多