【发布时间】:2020-03-25 02:54:18
【问题描述】:
我正在尝试最大化从 Kafka 批量处理的数据量。输出是我将数据写入服务器上的文件。我正在为我的消费者配置添加极高的值,但我仍然收到多个文件大小非常小的文件。
如下所示,我等待很长时间才能检索我的最小字节数。大约 20 秒后,轮询完成 N 条记录并写入一个非常小的文件。我将其解释为不尊重等待时间,也不尊重最小字节。为什么会这样?
代码:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, args.enableAutoCommit);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, args.maxPartitionFetchBytes);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);
消费者配置:
--max_fetch_bytes 2147483000
--min_fetch_bytes 2147483000
--max_poll_records 2147483000
--max_partition_fetch_bytes 2147483000
--enable_auto_commit false
--fetch_max_wait 900000
【问题讨论】:
-
您没有告诉我们
consumer.poll(long timeout)中设置的超时时间。 -
1000 毫秒是轮询持续时间
标签: java apache-kafka