【发布时间】:2020-09-20 23:52:07
【问题描述】:
我从 Kafka 主题中读取了一些消息,并为每个 rdd 执行函数 proccess_rdds。
def spark_streaming_online():
conf = SparkConf().setMaster("spark://antonis:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,2)
kvs = KafkaUtils.createDirectStream(ssc,
['enriched_messages'],
{"metadata.broker.list":"my-broker"},
valueDecoder=lambda x: x)
lines = kvs.window(60,2).map(lambda x: x[1])
lines.foreachRDD(lambda y: proccess_rdds(y))
return ssc
ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06',lambda: spark_streaming_online())
ssc.start()
ssc.awaitTermination()
我不能在这里发布来自proccess_rdds 的所有内容,因为它太大了。我发布了发生错误的代码(至少我是这么认为的)。
ds = df \
.selectExpr("CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "my-broker") \
.option("topic", "compressed_messages") \
.save()
错误提示:
调用 o186.save 时出错。 :
org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 2.0 中的任务 14 失败 4 次,最近一次失败:丢失任务 14.3 在阶段 2.0 (TID 34, 192.168.1.69, executor 0): java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.flush()V
我提交这个 python 脚本:
/usr/spark/spark-2.4.5-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 --jars /home/antonis/Desktop/jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.5.jar spark_streamer.py
Spark 版本:2.4.5
奇怪的是,这个脚本有时可以正常工作而没有错误。我怀疑版本有问题。
谁能帮我解决这个问题?
【问题讨论】:
-
您可能是对的,您的某些代码尝试执行
org.apache.kafka.clients.producer.KafkaProducer.flush()并且找不到该方法。不使用结构化流的任何特殊原因?您正在使用最新的 Spark 2.4.5 版本和几乎不推荐使用的 API (DStream) -
不,没有特别的原因,我删除了结构化流。适用于经典 producer.send()
-
恐怕您必须深入研究 Spark 实现并尝试找出调用
org.apache.kafka.clients.producer.KafkaProducer.flush()的代码,然后尝试提供正确的spark-sql-kafka版本
标签: apache-spark pyspark apache-kafka spark-streaming