【问题标题】:java.lang.StringIndexOutOfBoundsException: String index out of range: -650599791java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:-650599791
【发布时间】:2019-10-08 05:01:40
【问题描述】:

这里的错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 132.0 failed 4 times, most recent failure : Lost task 0.3 in stage 132.0: 
java.lang.StringIndexOutOfBoundsException: String index out of range: -650599791
     at java.lang.String.<init>(String.java:196)
     at com.esotericsoftware.kryo.io.Input.readString(Input.java:484)
     at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:195)
     at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:184)
     at com.esotericsoftware.kryo.readClassAndObject(kryo.java:790)
     at arg.apache.spark.Serializer.kryoDeserializationStream.readObject(kryoSerializer.scala:244)
     at arg.apache.spark.Serializer.DeserializationStream.readKey(Serializer.scala:157)
     at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:189)
     at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:186)
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
     at org.apache.spark.util.completionIterator.hasNext(CompletionIterator.scala:32)
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
     at org.apache.spark.Aggregator.combineValuesBykey(Aggregator.scala:41)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)

我在 Java 中调用 spark 分组方法时发生错误。 Spark 版本为 2.1.0,Java 版本为 1.8。

JavaPairRDD<String, List<String>> combineRdd = pairRDD.partitionBy(new HashPartitioner(mission.getCombineCount()))
                .combineByKey(new Function<String, List<String>>() {
                    private static final long serialVersionUID = 6592724289900217307L;

                    @Override
                    public List<String> call(String v1) throws Exception {
                        List<String> re = new ArrayList<>();
                        re.add(v1);
                        return re;
                    }
                }, new Function2<List<String>, String, List<String>>() {
                    private static final long serialVersionUID = -5882646966686065803L;

                    @Override
                    public List<String> call(List<String> v1, String v2) throws Exception {
                        v1.add(v2);
                        return v1;
                    }
                }, new Function2<List<String>, List<String>, List<String>>() {
                    private static final long serialVersionUID = -1624057077103693447L;

                    @Override
                    public List<String> call(List<String> v1, List<String> v2) throws Exception {
                        v1.addAll(v2);
                        return v1;
                    }
                });
  System.out.println("group rdd count: " + combineRdd.count());

我能想到的原因是数据太多。我应该在分组数据之前做些什么。还有其他原因吗?

【问题讨论】:

    标签: apache-spark java-8 rdd


    【解决方案1】:

    我猜v1.addAll(v2);有问题,你必须创建并返回新列表:

    @Override
    public List<String> call(List<String> v1, List<String> v2) throws Exception {
       List<String> list = new ArrayList<String>(v1);
       list.addAll(v2);
       return list;
       // in java 8
       // return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
    }
    

    此外,如果您使用的是 java 8,则可以使用 lambdas 并在一行中完成,请参见下面的示例:

    JavaPairRDD<String, List<String>> pair = spark.range(10L)
            .toJavaRDD()
            .mapToPair(s -> Tuple2.apply(
                    Long.valueOf(s % 3).toString(),
                    Arrays.asList(s % 2, s, s + 1)
                            .stream()
                            .map(z -> z.toString())
                            .collect(Collectors.toList())
                    )
            );
    
    pair.foreach(s -> System.out.println(s._1 + "," + s._2.toString()));
    
    pair.reduceByKey((a, b) ->
            Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
    ).foreach(s -> System.out.println(s._1 + "," + s._2.toString() + "gr. count: " + s._2.size()));
    

    输出:

    0,[0, 0, 1]
    1,[1, 1, 2]
    2,[0, 2, 3]
    0,[1, 3, 4]
    1,[0, 4, 5]
    2,[1, 5, 6]
    0,[0, 6, 7]
    1,[1, 7, 8]
    2,[0, 8, 9]
    0,[1, 9, 10]
    
    0,[0, 0, 1, 1, 3, 4, 0, 6, 7, 1, 9, 10]gr. count: 12
    2,[0, 2, 3, 1, 5, 6, 0, 8, 9]gr. count: 9
    1,[1, 1, 2, 0, 4, 5, 1, 7, 8]gr. count: 9
    

    【讨论】:

      猜你喜欢
      • 2012-03-02
      • 1970-01-01
      • 2016-02-25
      • 2018-11-14
      • 1970-01-01
      • 2014-04-14
      • 1970-01-01
      • 2019-05-22
      • 1970-01-01
      相关资源
      最近更新 更多