【问题标题】:Kafka Connect: java.lang.IllegalStateException: No current assignment for partitionKafka Connect:java.lang.IllegalStateException:分区没有当前分配
【发布时间】:2019-08-14 23:25:46
【问题描述】:

我在 Kubernetes(8-16 个节点,自动缩放)上运行 Kafka Connect。我总共定义了 44 个连接器,每个 Kafka 主题一个(每个主题一个分区)。这些主题由 Debezium / Postgresql 生成。有 3 个 Kafka 节点。每个连接器都将 tasks.max 设置为 4。由于 java.lang.IllegalStateException: No current assignment for partition -0,我的大多数连接器(但不是每个!)都有一个(总是一个)失败的任务。

这里不是 Kafka 专家,请注意 ;) 我假设有 3 个 Kafka 节点,所以 3 个工作人员做得很好,而第 4 个任务没有任何连接,所以它失败了。但为什么有时有 4 个任务运行良好?

另外,我经常遇到“由于重新平衡而导致操作冲突”的问题,这种情况可能会持续几分钟甚至几小时。最近我删除了所有的 pod,它们自己重新启动,问题消失了,但这不是长期的解决方案。

tasks.max 的推荐值是多少?提前致谢!

例外:

java.lang.IllegalStateException: No current assignment for partition table-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748

接收器连接器配置:

connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas   true
sanitizeTopics  true
autoCreateTables    true
topics  <topic-name>
tasks.max   3
schemaRegistryLocation  http://<ip>:8081
project <big-query-project>
maxWriteSize    10000
datasets    .*=<big-query-dataset>
task.class  com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name    <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait  1000
bufferSize  100000

它会抛出异常java.lang.IllegalStateException: No current assignment for [...]

【问题讨论】:

  • 你能附上一些日志吗?
  • @wardziniak 我在上面添加了跟踪,有帮助吗?
  • 你在用io.debezium.connector.postgresql.PostgresConnector吗?
  • 是的@wardziniak,但这不在我的控制之下。这是 0.9.2.Final
  • 什么SinkConnector 抛出异常?你能附上它的配置吗?

标签: apache-kafka apache-kafka-connect rebalancing


【解决方案1】:

属性tasks.max 的价值取决于几个因素。最重要的是特定的连接器。 特定连接器取决于其逻辑和tasks.max 的值计算Task 的数量,将被创建。 前任。 FileStreamSourceConnector 始终创建 1 个任务,因此即使您传递的值高于 1,它也只会创建一个。 同样的情况是PostgresConnector平行 到一。

tasks.max 的值还应取决于其他因素,例如:Kafka Connect 模式、您拥有的 Kafka Connect 实例数量、机器 CPU 等。

我如何理解您正在使用 SourceConnector (PostgresConnector)。 源连接器不会轮询来自 Kafka 的数据。例外,您发布的内容与某些 SinkConnector 有关。 如果使用的是SinkConnector,您的tasks.max 不应超过分区数。 如果您启动的任务多于分区数,一些任务将处于空闲状态(状态正在运行,但它们不处理数据)并且可能会发生重新平衡。

【讨论】:

  • 不是,真的。我根本不使用源连接器。接收器连接器 38 源连接器 0 连接器使用的主题 36
  • 鉴于您在上面的回答,我开始认为 task.max 在我的情况下应该是 1,但想了解为什么大多数连接器都可以正常使用 tasks.max - 1 config。跨度>
  • 每个主题一个分区
  • 在 Kafka Connect 底层使用 pure 消费者,因此在这种情况下使用多个任务是没有意义的
  • 一般适用于所有sink连接器。 Task(创建的)的确切数量是Connector::taskConfigs 返回的列表的大小。没有 long 重新平衡和异常的情况下,还有哪些其他连接器可以工作?
猜你喜欢
  • 2017-06-28
  • 1970-01-01
  • 2018-09-02
  • 1970-01-01
  • 2017-04-21
  • 2018-06-21
  • 1970-01-01
  • 2020-12-26
  • 2018-10-08
相关资源
最近更新 更多