【发布时间】:2026-02-01 15:55:01
【问题描述】:
我正在尝试将包含大约 2.3 亿条记录的数据帧写入 Kafka。更具体地说是Kafka-enable Azure Event Hub,但我不确定这是否真的是我的问题的根源。
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
这可以很好地启动并成功(并且非常快)将大约 3-4 百万条记录写入队列。但是几分钟后工作就停止了,并显示如下消息:
org.apache.spark.SparkException:作业因阶段故障而中止:阶段 7.0 中的任务 6 失败 4 次,最近一次失败:阶段 7.0 中丢失任务 6.3(TID 248、10.139.64.5、执行程序 1):kafkashaded .org.apache.kafka.common.errors.TimeoutException:mytopic-18 的 61 条记录到期:自上次追加以来已过去 32839 毫秒
或
org.apache.spark.SparkException:作业因阶段故障而中止:阶段 8.0 中的任务 13 失败 4 次,最近一次失败:阶段 8.0 中丢失任务 13.3(TID 348、10.139.64.5、执行程序 1):kafkashaded .org.apache.kafka.common.errors.TimeoutException: 请求超时。
另外,我从来没有看到正在创建/写入检查点文件。
我也玩过.option("kafka.delivery.timeout.ms", 30000) 和不同的值,但这似乎没有任何效果。
我在 Azure Databricks 集群 5.0 版(包括 Apache Spark 2.4.0、Scala 2.11)中运行它
我没有在我的 Event Hub 上看到任何错误,例如限制,所以应该没问题。
【问题讨论】:
-
你能分享一些你的 Spark UI 的截图(特别是执行者)
-
你是一一发消息到kafka还是批量发消息到kafka...尝试批量发消息到kafka
-
我分批发帖。只是用减少的批量再次测试。默认值约。 16000 可能太高了
-
谢谢大家,现在想通了。请参阅下面的答案。
标签: azure apache-spark pyspark apache-kafka databricks