【问题标题】:How to measure the time Spark needs to run an action on partitioned RDD?如何测量 Spark 在分区 RDD 上运行操作所需的时间?
【发布时间】:2015-06-03 16:43:35
【问题描述】:

我编写了一个小型 Spark 应用程序,它应该测量 Spark 在分区 RDD 上运行操作所需的时间(combineByKey 函数对值求和)。

我的问题是,第一次迭代似乎工作正常(计算持续时间约为 25 毫秒),但下一次迭代所需的时间要少得多(约 5 毫秒)。在我看来,Spark 在没有任何请求的情况下保留数据!?我可以通过编程方式避免这种情况吗?

我必须知道 Spark 计算新 RDD 所需的持续时间(没有任何缓存/保留早期迭代)--> 我认为持续时间应该始终在 20-25 毫秒左右!

为了确保重新计算,我将 SparkContext 生成移到 for 循环中,但这并没有带来任何变化...

感谢您的建议!

我的代码似乎可以保留任何数据:

public static void main(String[] args) {

    switchOffLogging();

    // jetzt

    try {
        // Setup: Read out parameters & initialize SparkContext
        String path = args[0];
        SparkConf conf = new SparkConf(true);
        JavaSparkContext sc;

        // Create output file & writer
        System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime");

        // The RDDs used for the benchmark
        JavaRDD<String> input = null;
        JavaPairRDD<Integer, String> pairRDD = null;
        JavaPairRDD<Integer, String> partitionedRDD = null;
        JavaPairRDD<Integer, Float> consumptionRDD = null;

        // Do the tasks iterative (10 times the same benchmark for testing)
        for (int i = 0; i < 10; i++) {
            boolean partitioning = true;
            int partitionsCount = 8;

            sc = new JavaSparkContext(conf);
            setS3credentials(sc, path);

            input = sc.textFile(path);
            pairRDD = mapToPair(input);

            partitionedRDD = partition(pairRDD, partitioning, partitionsCount);

            // Measure the duration
            long duration = System.currentTimeMillis();
            // Do the relevant function
            consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
            duration = System.currentTimeMillis() - duration;

            // So some action to invoke the calculation
            System.out.println(consumptionRDD.collect().size());

            // Print the results
            System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms");

            input = null;
            pairRDD = null;
            partitionedRDD = null;
            consumptionRDD = null;

            sc.close();
            sc.stop();

        }
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println(e.getMessage());
    }
}

一些辅助函数(应该不是问题):

private static void switchOffLogging() {
    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);
}

private static void setS3credentials(JavaSparkContext sc, String path) {
    if (path.startsWith("s3n://")) {
        Configuration hadoopConf = sc.hadoopConfiguration();
        hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials");
        hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials");
    }
}

// Initial element
private static Function<String, Float> createCombiner = new Function<String, Float>() {
    public Float call(String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        return value;
    }
};

// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
    public Float call(Float sumYet, String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        sumYet += value;
        return sumYet;
    }
};

// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
    public Float call(Float a, Float b) throws Exception {
        a += b;
        return a;
    }
};

private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
    if (partitioning) {
        return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
    } else {
        return pairRDD;
    }
}

private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
    return input.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
            String[] data = debsDataSet.split(",");
            int houseId = Integer.valueOf(data[6]);
            return new Tuple2<Integer, String>(houseId, debsDataSet);
        }
    });
}

最后是 Spark 控制台的输出:

part.   Count   input.p cons.p  Time
true    8       6       8       20 ms
true    8       6       8       23 ms
true    8       6       8       7 ms        // Too less!!!
true    8       6       8       21 ms
true    8       6       8       13 ms
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       5 ms        // Too less!!!
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       4 ms        // Too less!!!
true    8       6       8       7 ms        // Too less!!!

【问题讨论】:

    标签: java caching apache-spark


    【解决方案1】:

    我现在找到了一个解决方案:我编写了一个单独的类,它在一个新进程上调用 spark-submit 命令。这可以在一个循环中完成,因此每个基准测试都在一个新线程中启动,并且 sparkContext 也是每个进程分开的。这样垃圾收集就完成了,一切正常!

    String submitCommand = "/root/spark/bin/spark-submit " + submitParams + " --   class partitioning.PartitionExample /root/partitioning.jar " + javaFlags;
    Process p = Runtime.getRuntime().exec(submitCommand);
    
    BufferedReader reader;
    String line;
    
    System.out.println(p.waitFor());
    reader = new BufferedReader(new InputStreamReader(p.getInputStream()));         
    while ((line = reader.readLine())!= null) {
      System.out.println(line);
    }
    

    【讨论】:

      【解决方案2】:

      如果 shuffle 输出足够小,那么 Spark shuffle 文件将写入 OS 缓冲区缓存,因为 fsync 没有显式调用...这意味着,只要有空间,您的数据就会保留在内存中。

      如果确实需要进行冷性能测试,那么您可以尝试this attempt to flush the disk 之类的方法,但这会减慢每次测试的中间速度。你能把上下文上下旋转吗?这可能会解决您的需求。

      【讨论】:

      • 尝试了来自link的linux命令,它说运行1)“sudo sync”和2)“echo 3 > sudo /proc/sys/vm/drop_caches”没有成功......我也尝试了 SparkConf.set 方法 SparkConf conf = new SparkConf(true).set("spark.files.useFetchCache", "false"); 但也没有效果...
      猜你喜欢
      • 1970-01-01
      • 2016-09-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-03
      • 1970-01-01
      相关资源
      最近更新 更多