【发布时间】:2021-11-28 04:03:56
【问题描述】:
我在 kafka-connect 中部署了我的自定义连接器并使用以下配置进行了注册:
{
"name" : "rabbitmq-source-connector",
"config" : {
"connector.class" : "net.geli.kafkaconnect.rabbitmqsourceconnector.RabbitMQSourceConnector",
"tasks.max": "5",
"kafka.topic" : "kafka-test",
"rabbitmq.queue": "rabbitmq-test-queue",
"rabbitmq.prefetch.count": 100,
"rabbitmq.host": "localhost",
"rabbitmq.virtual.host": "/",
"rabbitmq.port": "5672",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry-cp-schema-registry.kafka:8081",
"value.converter.schemas.enable": true,
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
它工作正常,但是当我们重新启动 kafka-connect 服务低于 NullPointerException 时,我尝试重新启动 kafka-broker 但同样的错误。
2021-10-08 06:07:13,122 ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1]
java.lang.NullPointerException
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:685)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:287)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:154)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:265)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:277)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-10-08 06:07:13,123 INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect) [Thread-10]
2021-10-08 06:07:13,125 INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,130 INFO Stopped http_8083@39c11e6c{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector) [Thread-10]
2021-10-08 06:07:13,130 INFO node0 Stopped scavenging (org.eclipse.jetty.server.session) [Thread-10]
2021-10-08 06:07:13,131 INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,131 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect) [Thread-10]
卡夫卡版本: kafka_2.12-2.6.0
我是卡夫卡的新手。我做了谷歌,但没有运气。你能帮我解决这个错误吗?
【问题讨论】:
-
您是否修改了内部转换器设置?您可以将您的连接分布式属性文件作为minimal reproducible example 的一部分分享吗?
-
@OneCricketeer。非常感谢您的快速回复。它在清理连接集群配置、连接集群状态、连接集群偏移主题后工作。这是我的连接器的配置详细信息``` offset.storage.topic=connect-cluster-offsets config.storage.topic=connect-cluster-configs status.storage.topic=connect-cluster-status group.id=connect- cluster config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 plugin.path=/opt/kafka/plugins/geli,/opt/kafka/plugins ```
-
能否请edit 将您的问题包括在内?或者,如果您已解决问题,请随时在下方填写您自己的答案
标签: apache-kafka apache-kafka-connect