【问题标题】:Kafka batching configuration is not consistentKafka 批处理配置不一致
【发布时间】:2020-03-25 02:54:18
【问题描述】:

我正在尝试最大化从 Kafka 批量处理的数据量。输出是我将数据写入服务器上的文件。我正在为我的消费者配置添加极高的值,但我仍然收到多个文件大小非常小的文件。

如下所示,我等待很长时间才能检索我的最小字节数。大约 20 秒后,轮询完成 N 条记录并写入一个非常小的文件。我将其解释为不尊重等待时间,也不尊重最小字节。为什么会这样?

代码:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, args.enableAutoCommit);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, args.maxPartitionFetchBytes);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);

消费者配置:

--max_fetch_bytes 2147483000
--min_fetch_bytes 2147483000
--max_poll_records 2147483000
--max_partition_fetch_bytes 2147483000
--enable_auto_commit false
--fetch_max_wait 900000

【问题讨论】:

  • 您没有告诉我们consumer.poll(long timeout) 中设置的超时时间。
  • 1000 毫秒是轮询持续时间

标签: java apache-kafka


【解决方案1】:

here's 可以提前终止 DelayedFetch 的列表:

/**
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: The replica is no longer available on this broker
* Case C: This broker does not know of some partitions it tries to fetch
* Case D: The partition is in an offline log directory on this broker
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/

请注意,案例 F 可能发生在分段滚动代理端 - 您多久会发生一次?

【讨论】:

    【解决方案2】:

    将此配置添加到您的消费者:

    max.poll.interval.ms=3600000  
    

    如果它不起作用,也添加它们:

    connections.max.idle.ms=3600000  
    default.api.timeout.ms=3600000  
    

    现在批处理应该最多等待一小时。

    【讨论】:

    • 我早些时候做了 max.poll.interval 并在处理我的初始批次后得到了消费者唤醒异常。有什么原因会发生这种情况吗?配置很棘手
    猜你喜欢
    • 2018-02-21
    • 1970-01-01
    • 2012-10-17
    • 1970-01-01
    • 1970-01-01
    • 2023-03-27
    • 2019-12-15
    • 1970-01-01
    • 2017-08-11
    相关资源
    最近更新 更多