【问题标题】:Pyspark Structured Streaming Kafka configuration errorPyspark Structured Streaming Kafka 配置错误
【发布时间】:2017-12-11 02:02:51
【问题描述】:

我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 

但是,我总是遇到以下错误:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

我还尝试在创建 ds1 时将其添加到我的选项集中:

.option("partition.assignment.strategy", "range")

但是,即使明确地为它分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。

我也尝试使用“assign”选项,但出现了同样的错误(我们的 Kafka 主机设置为分配 - 每个消费者只分配一个分区,我们没有任何重新平衡)。

知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流式处理的方法?或者这是唯一的网关?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka apache-spark-sql spark-structured-streaming


    【解决方案1】:
    1. Kafka 0.10.1.* 客户端存在一个已知问题,您不应该将它与 Spark 一起使用,因为它可能会由于 https://issues.apache.org/jira/browse/KAFKA-4547 而生成错误答案。您可以使用 0.10.0.1 客户端,它应该与 0.10.1.* Kafka 集群一起使用。

    2. 在Structured Streaming中向Kafka消费者客户端发送Kafka配置,需要添加kafka.前缀,如.option("kafka.partition.assignment.strategy", "range")。但是,您不需要设置kafka.partition.assignment.strategy,因为它具有默认值。我的预感是您可能将 Kafka 0.8.* 和 0.10.* jar 放在类路径上并加载错误的类。

    3. 您想使用 KafkaUtils 中的哪个 API,但结构化流中缺少哪个 API? Spark 2.2.0 刚刚发布,您可以在结构化流中使用 Kafka 的批处理或流式查询。阅读http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html 了解示例。

    【讨论】:

      【解决方案2】:

      kafka-clients-*.jar 添加到您的 spark jar 文件夹中,然后重新启动 spark master 和 worker。那你就不用加.option("partition.assignment.strategy", "range")

      【讨论】:

        【解决方案3】:

        我在 Spark 2.3.2 中使用结构化流时遇到了这个问题。就像@bruce.liu 在他的回答中暗示的那样,当 Spark 的 JVM 在其类路径中没有 kafka-clients....jar 文件时,就会发生这种情况。

        我通过下载 kafka-clients jar (https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.10.0.1) 修复了它,然后使用 --jars--driver-class-path 选项将其提供给 spark-submit。

        类似这样的:

        spark-submit --class MainClass --master local[*] --jars local:///root/sources/jars/kafka-clients-0.10.0.1.jar --driver-class-path local:///root/sources/jars/kafka-clients-0.10.0.1.jar app.jar
        

        【讨论】:

          猜你喜欢
          • 2020-07-21
          • 2019-07-30
          • 2021-04-03
          • 1970-01-01
          • 2021-05-22
          • 2020-07-25
          • 1970-01-01
          • 2020-08-05
          • 2020-04-20
          相关资源
          最近更新 更多