【发布时间】:2021-05-05 01:09:33
【问题描述】:
我尝试执行代码,但出现以下错误: java.lang.OutOfMemoryError:Java 堆空间 org.apache.spark.shuffle.MetadataFetchFailedException:缺少 shuffle 4 的输出位置
代码可以在小文件(一些 kb)上执行,但在“大”文件(5mb)上我会出错。我尝试增加 VM 内存和 spark.driver.memory 但我再次遇到相同的错误。
parkConf sparkConf = new SparkConf().setAppName("aName");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaPairRDD<String, String> edges = lines.flatMapToPair(t -> {
List<Tuple2<String,String>> result = new ArrayList<>();
if(!t.contains("#")) {
String [] nodes = SPACE.split(t);
if(Long.parseLong(nodes[0])<Long.parseLong(nodes[1])) {
result.add(new Tuple2<>(nodes[0], nodes[1]));
} else {
result.add(new Tuple2<>(nodes[1], nodes[0]));
}
}
return result.iterator();
});
JavaPairRDD<String, String> edgesReverse = edges.mapToPair(t -> {
return new Tuple2<>(t._2(), t._1());
});
JavaPairRDD<String, Tuple2<String, String>> rdd1 = edges.join(edgesReverse);
JavaPairRDD<String, Tuple2<String, String>> rdd2 = edges.join(edges);
JavaPairRDD<String, Tuple2<String, String>> allRDD = rdd1.union(rdd2).distinct();
JavaPairRDD<Tuple2<String, String>,Double> commonNeighbors = allRDD.mapToPair(t -> {
if(Long.parseLong(t._2()._1())<Long.parseLong(t._2()._2())) {
return new Tuple2<>(t._2()._1(),t._2()._2());
} else {
return new Tuple2<>(t._2()._2(),t._2()._1());
}
}).subtract(edges).mapToPair(t->{
return new Tuple2<>(t,Double.parseDouble("1"));
}).reduceByKey((a,b)->a+b).mapToPair(t -> {
return new Tuple2<>(t._2(),t._1());
}).sortByKey(false).mapToPair(t -> {
return new Tuple2<>(t._2(),t._1());
});
commonNeighbors.saveAsTextFile(args[1]);
【问题讨论】:
-
处理 OOM 的一个好方法是使用 Heap Dump 并使用 Eclipse MAT 等工具对其进行分析,以找出哪个对象没有被 GC'ed
标签: java apache-spark rdd java-pair-rdd