【问题标题】:Flink: DataSet.count() is bottleneck - How to count parallel?Flink:DataSet.count() 是瓶颈 - 如何计算并行?
【发布时间】:2016-12-04 05:38:02
【问题描述】:

我正在使用 Flink 学习 Map-Reduce,并且有一个关于如何有效计算 DataSet 中的元素的问题。我到目前为止是这样的:

DataSet<MyClass> ds = ...;
long num = ds.count();

执行此操作时,在我的 flink 日志中显示

12/03/2016 19:47:27 DataSink (count())(1/1) 切换到 RUNNING

所以只使用了一个 CPU(我有四个和其他命令,如 reduce 全部使用)。

我认为 count() 在内部从所有四个 CPU 收集 DataSet 并按顺序对它们进行计数,而不是让每个 CPU 计算其部分然后将其相加。这是真的吗?

如果是,我怎样才能利用我所有的 CPU?首先将我的 DataSet 映射到一个 2 元组,其中包含作为第一项的原始值和作为第二项的长值 1,然后使用 SUM 函数聚合它是一个好主意吗?

例如,DataSet 将映射到 DataSet>,其中 Long 始终为 1。因此,当我对所有项目求和时,元组的第二个值的总和将是正确的计数值。

对数据集中的项目进行计数的最佳做法是什么?

问候 西蒙

【问题讨论】:

    标签: java mapreduce apache-flink


    【解决方案1】:

    DataSet#count() 是非并行操作,因此只能使用单个线程。

    您将逐个键计数以获得并行化,并对键计数应用最终总和以获得总体计数以加快计算速度。

    【讨论】:

    • 感谢您的回答。不幸的是,我不知道如何进行按键计数。这是flink操作吗?我找不到这方面的信息
    • 我找到了解决方案。你是这个意思吗? DataSet> x = hasNum.map(new MapFunction>() { @Override public Tuple1 map(MyClass t) throws Exception { return new Tuple1(1L) ; } }).groupBy(0).sum(0);
    • 几乎 ;) 我评论你自己的答案
    【解决方案2】:

    这是一个好的解决方案吗?

    DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() { 
        @Override public Tuple1<Long> map(MyClass t) throws Exception { 
            return new Tuple1<Long>(1L); 
        } 
    }).groupBy(0).sum(0);
    
    Long c = x.collect().iterator().next().f0;
    

    【讨论】:

    • 一般的想法是正确的——但是,你映射到 Tuple1 并对所有元组使用相同的值——因此,所有元组最终都在同一个线程上,它实际上仍然是单线程的——甚至如果启动多个线程,则只有一个会接收数据。因此,您应该在new Tuple1&lt;Long&gt;(...) 中生成一个随机值。你使用count() 而不是sum(0)。这将为您提供需要再次总结的多个计数。
    • count() 内部不这样做有什么原因吗?
    • 没有技术原因。 Flink 理论上可以自动应用这种优化。只是碰巧它从未实施过——你需要在邮件列表中调查找出原因。
    猜你喜欢
    • 2012-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-12
    • 2022-12-06
    • 1970-01-01
    相关资源
    最近更新 更多