【问题标题】:Unable to set kafka spark consumer configs无法设置 kafka spark 消费者配置
【发布时间】:2020-05-21 09:11:49
【问题描述】:

我使用 spark-sql-2.4.x 版本的 kafka 客户端。

即使在设置了消费者配置参数之后 IE。 max.partition.fetch.bytes & max.poll.records

未正确设置并显示如下默认值

Dataset<Row> df = sparkSession
                      .readStream()
                      .format("kafka")
                      .option("kafka.bootstrap.servers", server1)
                      .option("subscribe", TOPIC1) 
                      .option("includeTimestamp", true)
                      .option("startingOffsets", "latest")
                      .option("max.partition.fetch.bytes", "2097152") // default 1000,000
                      .option("max.poll.records", 6000)  // default 500
                      .option("metadata.max.age.ms", 450000) // default 300000
                      .option("failOnDataLoss", false)
                      .load();

启动消费者时,日志中仍显示如下:

[Executor task launch worker for task 21] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

设置这个的正确方法是什么?

【问题讨论】:

    标签: apache-spark apache-kafka apache-spark-sql kafka-consumer-api spark-structured-streaming


    【解决方案1】:

    来自documentation

    Kafka 自己的配置可以通过 DataStreamReader.option 设置 卡夫卡。前缀,例如 stream.option("kafka.bootstrap.servers", “主机:端口”)。有关可能的 kafka 参数,请参阅Kafka consumer config 与读取数据相关的参数的文档,以及Kafka producer config 与写入数据相关的参数的文档。

    我相信您需要添加“kafka”。根据您的选择,例如:

    .option("kafka.max.poll.records", 6000) 
    

    【讨论】:

      【解决方案2】:

      你可以使用属性

      .("maxOffsetsPerTrigger","6000")
      

      保持微批量大小。它会在每个微批次中获取最多 6000 个偏移量

      【讨论】:

        猜你喜欢
        • 2018-03-05
        • 1970-01-01
        • 2020-05-23
        • 2019-03-26
        • 1970-01-01
        • 2017-02-23
        • 2020-01-06
        • 2020-09-05
        • 2017-02-14
        相关资源
        最近更新 更多