【发布时间】:2019-10-08 05:01:40
【问题描述】:
这里的错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 132.0 failed 4 times, most recent failure : Lost task 0.3 in stage 132.0:
java.lang.StringIndexOutOfBoundsException: String index out of range: -650599791
at java.lang.String.<init>(String.java:196)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:484)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:195)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:184)
at com.esotericsoftware.kryo.readClassAndObject(kryo.java:790)
at arg.apache.spark.Serializer.kryoDeserializationStream.readObject(kryoSerializer.scala:244)
at arg.apache.spark.Serializer.DeserializationStream.readKey(Serializer.scala:157)
at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:189)
at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:186)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.completionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineValuesBykey(Aggregator.scala:41)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)
我在 Java 中调用 spark 分组方法时发生错误。 Spark 版本为 2.1.0,Java 版本为 1.8。
JavaPairRDD<String, List<String>> combineRdd = pairRDD.partitionBy(new HashPartitioner(mission.getCombineCount()))
.combineByKey(new Function<String, List<String>>() {
private static final long serialVersionUID = 6592724289900217307L;
@Override
public List<String> call(String v1) throws Exception {
List<String> re = new ArrayList<>();
re.add(v1);
return re;
}
}, new Function2<List<String>, String, List<String>>() {
private static final long serialVersionUID = -5882646966686065803L;
@Override
public List<String> call(List<String> v1, String v2) throws Exception {
v1.add(v2);
return v1;
}
}, new Function2<List<String>, List<String>, List<String>>() {
private static final long serialVersionUID = -1624057077103693447L;
@Override
public List<String> call(List<String> v1, List<String> v2) throws Exception {
v1.addAll(v2);
return v1;
}
});
System.out.println("group rdd count: " + combineRdd.count());
我能想到的原因是数据太多。我应该在分组数据之前做些什么。还有其他原因吗?
【问题讨论】:
标签: apache-spark java-8 rdd