【问题标题】:How to solve java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.flush()V error in pyspark如何解决 pyspark 中的 java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.flush()V 错误
【发布时间】:2020-09-20 23:52:07
【问题描述】:

我从 Kafka 主题中读取了一些消息,并为每个 rdd 执行函数 proccess_rdds

def spark_streaming_online():

    conf = SparkConf().setMaster("spark://antonis:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,2)

    kvs = KafkaUtils.createDirectStream(ssc,
                                        ['enriched_messages'],
                                        {"metadata.broker.list":"my-broker"},
                                        valueDecoder=lambda x: x)
    lines = kvs.window(60,2).map(lambda x: x[1])
    lines.foreachRDD(lambda y: proccess_rdds(y))


    return ssc

 ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06',lambda: spark_streaming_online())
    ssc.start()
    ssc.awaitTermination()

我不能在这里发布来自proccess_rdds 的所有内容,因为它太大了。我发布了发生错误的代码(至少我是这么认为的)。

ds = df \
     .selectExpr("CAST(value AS STRING)") \
     .write \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "my-broker") \
     .option("topic", "compressed_messages") \
     .save()

错误提示:

调用 o186.save 时出错。 :

org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 2.0 中的任务 14 失败 4 次,最近一次失败:丢失任务 14.3 在阶段 2.0 (TID 34, 192.168.1.69, executor 0): java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.flush()V

我提交这个 python 脚本:

/usr/spark/spark-2.4.5-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 --jars /home/antonis/Desktop/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.5.jar spark_streamer.py

Spark 版本:2.4.5

奇怪的是,这个脚本有时可以正常工作而没有错误。我怀疑版本有问题。

谁能帮我解决这个问题?

【问题讨论】:

  • 您可能是对的,您的某些代码尝试执行org.apache.kafka.clients.producer.KafkaProducer.flush() 并且找不到该方法。不使用结构化流的任何特殊原因?您正在使用最新的 Spark 2.4.5 版本和几乎不推荐使用的 API (DStream)
  • 不,没有特别的原因,我删除了结构化流。适用于经典 producer.send()
  • 恐怕您必须深入研究 Spark 实现并尝试找出调用 org.apache.kafka.clients.producer.KafkaProducer.flush() 的代码,然后尝试提供正确的 spark-sql-kafka 版本

标签: apache-spark pyspark apache-kafka spark-streaming


【解决方案1】:

添加这个依赖:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.2.1</version>
</dependency>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-26
    • 1970-01-01
    • 1970-01-01
    • 2011-01-26
    • 1970-01-01
    • 1970-01-01
    • 2016-02-05
    • 2018-06-22
    相关资源
    最近更新 更多