【发布时间】:2021-09-14 05:10:46
【问题描述】:
我正在使用 kafka-clients 2.5.1 并且有一个具有以下属性的 kafka 流应用程序:
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
处理一条记录平均需要 30 分钟,但每次之后,都会出现以下错误,并且应用程序开始重新读取它刚刚处理的消息:
02-07-2021 02:09:31.026 [employee_groups-client-StreamThread-1] WARN
org.apache.kafka.streams.processor.internals.StreamThread.runLoop - stream-thread
[employee_groups-client-StreamThread-1] Detected task 0_0 that got migrated to another thread.
This implies that this thread missed a rebalance and dropped out of the consumer group. Will
try to rejoin the consumer group. Below is the detailed description of the task:
>TaskId: 0_0
>> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [us4lat-employee_group_migration1]
> children: [KSTREAM-FOREACH-0000000001]
> KSTREAM-FOREACH-0000000001:
>Partitions [us4lat-employee_group_migration1-0]
org.apache.kafka.streams.errors.TaskMigratedException: Client request for task 0_0 has been
fenced due to a rebalance
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:530) ~
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478) ~
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) ~
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543) ~
[kafka-streams-2.5.1.jar!/:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977) ~
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) ~
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
[kafka-streams-2.5.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
[kafka-streams-2.5.1.jar!/:?]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed
since the group has already rebalanced and assigned the partitions to another member. This
means that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time
message processing. You can address this either by increasing max.poll.interval.ms or by
reducing the maximum size of batches returned in poll() with max.poll.records.
如果最大间隔设置为 Integer.MAX_VALUE,那么为什么在记录处理完成后每隔 30 多分钟就会出现上述异常?我怎样才能解决这个问题?我错过了什么吗? 我还从 Kafka-Stream 日志中验证了 max.poll.interval.ms 的值设置是否正确。
【问题讨论】:
标签: apache-kafka kafka-consumer-api apache-kafka-streams