【问题标题】:Slow spark application - java慢火花应用程序 - java
【发布时间】:2018-08-29 19:52:44
【问题描述】:

我正在尝试创建一个 spark 应用程序,它采用 latlongtimestamp 点的数据集,如果它们增加单元格计数位于网格单元内。网格由以 lonlattime 为 z 轴的 3d 单元格组成。

现在我已经完成了应用程序,它完成了它应该做的事情,但是扫描整个数据集需要几个小时(~9g)。我的集群由 3 个节点组成,每个节点有 4 个核心,每个 8g ram,我目前使用 6 个执行器,每个执行器有 1 个核心和 2g。

我猜我可以对代码进行相当多的优化,但我的代码中是否存在导致这种延迟的大错误?

    //Create a JavaPairRDD with tuple elements. For each String line of lines we split the string 
//and assign latitude, longitude and timestamp of each line to sdx,sdy and sdt. Then we check if the data point of 
//that line is contained in a cell of the centroids list. If it is then a new tuple is returned
//with key the latitude, Longitude and timestamp (split by ",") of that cell and value 1.

    JavaPairRDD<String, Integer> pairs = lines.mapToPair(x -> {


        String sdx = x.split(" ")[2];
        String sdy = x.split(" ")[3];
        String sdt = x.split(" ")[0];

        double dx = Double.parseDouble(sdx);
        double dy = Double.parseDouble(sdy);
        int dt = Integer.parseInt(sdt);

        List<Integer> t = brTime.getValue();
        List<Point2D.Double> p = brCoo.getValue();

        double dist = brDist.getValue();
        int dur = brDuration.getValue();

        for(int timeCounter=0; timeCounter<t.size(); timeCounter++) {
            for ( int cooCounter=0; cooCounter < p.size(); cooCounter++) {

                double cx = p.get(cooCounter).getX();
                double cy = p.get(cooCounter).getY();
                int ct = t.get(timeCounter);

                String scx = Double.toString(cx);
                String scy = Double.toString(cy);
                String sct = Integer.toString(ct);

                if (dx > (cx-dist) && dx <= (cx+dist)) {
                    if (dy > (cy-dist) && dy <= (cy+dist)) {
                        if (dt > (ct-dur) && dt <= (ct+dur)) {

                            return new Tuple2<String, Integer>(scx+","+scy+","+sct,1);
                        }
                    }
                }
            }
        }
        return new Tuple2<String, Integer>("Out Of Bounds",1);
    });

【问题讨论】:

  • 你是如何阅读文件的?
  • 我从 hdfs 和磁盘尝试过,但在这两种情况下都很慢。我已经尝试了数据集的 50mb 和 350mb 部分,每个部分需要 300 秒和 10 分钟
  • 我认为您应该将文件的相当一部分加载到地图中,然后将其分发以执行,我已经有一段时间了。也许从那以后情况发生了变化。类似this
  • 但它已经分发了。它在 hdfs 上,当我说从磁盘时,我的意思是它在同一路径上所有节点的磁盘上。
  • 尝试使用 mapPartitions 更快,请参阅此示例link;另一件事是把这部分代码放在循环之外timeCounter&lt;t.size()

标签: java apache-spark cluster-computing


【解决方案1】:

尝试使用 mapPartitions 更快,请参阅此示例链接;另一件事是把这部分代码放在循环timeCounter之外

【讨论】:

    【解决方案2】:

    可能导致运行此类 Spark 地图成本的最大因素之一与 RDD 上下文之外的数据访问有关,这意味着驱动程序交互。在您的情况下,至少有 4 个变量访问器会发生这种情况:brTimebrCoobrDistbrDuration。看来您正在通过String#split 进行一些行解析,而不是利用内置函数。最后,scxscysct 都是针对每个循环计算的,尽管它们只有在它们的数字对应物通过一系列检查时才会返回,这意味着浪费了 CPU 周期和额外的 GC。

    如果不实际审查工作计划,很难说以上是否会使绩效达到可接受的水平。查看您的历史服务器应用程序日志,看看是否有任何阶段占用了您的时间 - 一旦您确定了其中的罪魁祸首,这就是真正需要优化的地方。

    【讨论】:

      【解决方案3】:

      我尝试了 mappartitionstopair 并且还移动了 scx、scy 和 sct 的计算,以便仅当点满足条件时才计算它们。应用程序的速度仅用了17分钟就得到了显着提升!我相信 mappartitionsopair 是最大的因素。非常感谢 Mks 和 bsplosion!

      【讨论】:

      • 不客气 :) 你能把我的回答作为你问题的解决方案吗
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-09
      • 1970-01-01
      • 2015-11-07
      • 2016-10-12
      • 1970-01-01
      相关资源
      最近更新 更多