【问题标题】:Kafka Connect Bigquery Sink Connector - Offset commit failed during closeKafka Connect Bigquery Sink Connector - 关闭期间偏移提交失败
【发布时间】:2021-03-17 05:50:37
【问题描述】:

我正在尝试设置Kafka Connect BigQuery sink connector。 我有大约 50 个以上的 kafka 主题。当我部署连接器时,最初会显示 10 个(我认为连接器配置中的 threadPoolSize 默认值)表,其中包含一些数据。然后,新数据停止进入表格。此外,BigQuery 中没有显示新表。

我的连接器配置:

{
   "name": "kcbq-connect1",
   "config": {
     "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
     "tasks.max" : "1",
     "topics" : "topic1,topic2,topic2",
     "sanitizeTopics" : "true",
     "autoCreateTables" : "true",
     "autoUpdateSchemas" : "true",     
     "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
     "schemaRegistryLocation":"http://localhost:8081",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "bufferSize": "100000",
    "maxWriteSize":"10000",
    "tableWriteWait": "1000",
     "project" : "my-project-89507",
     "datasets" : ".*=my_cdc",
     "keyfile" : "/home/debezium/key.json"
     
   }
 }

这是我查询连接器状态时得到的结果:

{
  "name": "kcbq-connect1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.1.0.37:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.1.0.37:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail\n\tat com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:112)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\n"
    }
  ],
  "type": "sink"
}

跟踪日志:

[2020-12-05 13:07:22,399] WARN WorkerSinkTask{id=kcbq-connect1-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:389)
[2020-12-05 13:07:22,399] ERROR WorkerSinkTask{id=kcbq-connect1-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
java.util.concurrent.RejectedExecutionException: Task com.wepay.kafka.connect.bigquery.write.batch.CountDownRunnable@6231b75c rejected from com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor@4ebc3f40[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 65]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:92)
        at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:129)
        at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:386)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:618)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:694)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:751)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290)
        at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:964)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:171)
        at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-12-05 13:07:22,400] INFO [Consumer clientId=connector-consumer-kcbq-connect1-0, groupId=connect-kcbq-connect1] Member connector-consumer-kcbq-connect1-0-f7533028-4e1a-4492-9f56-9b2d7bc1bc4e sending LeaveGroup request to coordinator <hostname>:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1005)

这里的 Kafka 专家可以指导我解决问题的正确方向吗?谢谢。

【问题讨论】:

    标签: apache-kafka google-bigquery apache-kafka-connect confluent-platform confluent-schema-registry


    【解决方案1】:

    对我来说,当我从 Bigquery Sink Connector 版本 1.6.6 切换到 1.6.1 时,该问题已得到解决。

    【讨论】:

      猜你喜欢
      • 2021-10-16
      • 2019-01-21
      • 2021-07-15
      • 2021-07-26
      • 1970-01-01
      • 2021-07-26
      • 1970-01-01
      • 2020-06-04
      • 2022-10-04
      相关资源
      最近更新 更多