【问题标题】:Concurrent exception for KafkaConsumer is not safe for multi-threaded accessKafkaConsumer 的并发异常对于多线程访问是不安全的
【发布时间】:2017-12-20 07:38:43
【问题描述】:

我们从 Spark 流中调用 SparkSQL 作业。我们收到并发异常,并且 Kafka 消费者已关闭错误。以下是代码和异常详情:

Kafka 消费者代码

// Start reading messages from Kafka and get DStream
        final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
                getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, byte[]>Subscribe(SparkServiceConfParams.AIR.CONSUME_TOPICS,
                        sparkServiceConf.getKafkaConsumeParams()));

        ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
    // Decode each binary message and generate JSON array
    JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {}

..

    // publish generated json gzip to kafka 
    decodedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
            //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
            if(!jsonRdd4DF.isEmpty()) {
                //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();

                    AIRDataSetBean processAIRData = airMainJsonProcessor.processAIRData(json, sparkSession);

错误详情

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

Kafka 消费者终于关闭了:

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: 

This consumer has already been closed.

【问题讨论】:

  • 调用Kafka消费者的代码在哪里?
  • @YuvalItzchakov 我添加了代码
  • 您的问题是什么?似乎是 Spark 中的一个错误......
  • @MatthiasJ.Sax 是的,这似乎是 Spark Streaming kafka 连接器的缺陷,任何解决方案?
  • 不熟悉 Spark——我会用 Kafka Streams 代替 Spark ;)(免责声明:我正在积极为 Kafka Streams 做出贡献)

标签: apache-spark apache-kafka spark-streaming


【解决方案1】:

使用 Spark 流的缓存或持久选项解决了此问题。在这种情况下,使用缓存 RDD 不会再次从 Kafka 读取并且问题已解决。它启用了 stream 的并发使用。但请明智地使用缓存选项。这里是代码:

JavaDStream<ConsumerRecord<String, byte[]>> cache = consumerStream.cache();

【讨论】:

    猜你喜欢
    • 2018-10-06
    • 2020-07-29
    • 1970-01-01
    • 2019-05-19
    • 2021-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多