【问题标题】:Spark streaming throws java.util.ConcurrentModificationException while adding JsonArraySpark 流在添加 JsonArray 时抛出 java.util.ConcurrentModificationException
【发布时间】:2020-04-23 05:03:31
【问题描述】:

我在以下过程中遇到了错误。我知道似乎抛出了这个错误,因为它试图读取分区(rec)中的全部记录,但同时试图将其分配给字符串(Str=jsonArray.toJSONString();)在火花流配置中使用 5 秒的批处理间隔。对这段代码有什么建议吗?请帮助。谢谢

错误在这一行:

 Str=jsonArray.toJSONString();

下面是我的全部功能:

MapRowRDD.foreachRDD(rdd ->{
            rdd.foreachPartition(
                    rec-> {
                        while(rec.hasNext()) {
                            JSONObject record = rec.next();
                            i=i+1;
                          if(TimeUnit.MINUTES.convert(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                  .parse((String) record.get("DATE_TRANSACTION"))
                                  .getTime()-DateUtils.addMinutes(new Date(), -5)
                                  .getTime(),TimeUnit.MILLISECONDS)>=0 || Integer.valueOf((String) record.get("EVENT_TYPE"))<0) {
                              jsonArray.add(record);
                            if(i % v_BATCH_WINDOW == 0)
                            {   
                                try {
                                    Str=jsonArray.toJSONString();
                                    HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                                    out_JSON=Response.getBody();
                                    log.warn("Response : " + out_JSON.toString());
                                }
                                catch(UnirestConfigException e){
                                    System.out.println("UnirestConfigException occured "+ e.toString());
                                    e.printStackTrace();
                                }
                                jsonArray.clear();
                                i=0;
                            }
                          }
                        publishToKafka(record.toString(), outputTopic, props);
                        }
                        Str=jsonArray.toJSONString();
                        if (!Str.equals("[]") && Str!=null && !Str.isEmpty()) {
                            HttpResponse<String> Response = ui.post(v_REST_API_ENDPOINT).body(Str).asString();
                        }
                        jsonArray.clear();
                        i=0;
                    }   
                    );
        });

【问题讨论】:

    标签: java arrays exception spark-streaming concurrentmodification


    【解决方案1】:

    如您所知,当您通过不同的线程同时修改和迭代同一个集合时会发生此异常。 jsonArray 不是线程安全的,将其替换为 Vector 等一些线程安全的集合,看看是否有效

    【讨论】:

    • 感谢您的建议。我试图用 StringBuilder 改变 JsonArray。 Afaik,字符串生成器不是安全线程,所以我想我可以在我的代码中实现那个东西
    猜你喜欢
    • 2018-10-02
    • 2014-01-13
    • 1970-01-01
    • 1970-01-01
    • 2012-07-28
    • 1970-01-01
    • 2014-09-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多