【问题标题】:kafka mongo conector is not working as expectedkafka mongo 连接器未按预期工作
【发布时间】:2020-05-08 19:37:21
【问题描述】:

我正在使用带有 mongo 的 Kafka 连接(源),连接工作器正在运行,但它没有将数据写入 Kafka 主题,我正在使用源连接器和连接器的以下配置文件:

name=mongo-ff
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=haifa
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

但是当我尝试使用来自主题的数据时:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

我没有得到任何数据。

好像没有按预期写。

日志堆栈的一部分是:

    ... 3 more
[2020-01-22 17:19:52,727] INFO Opened connection [connectionId{localValue:3, serverValue:20}] to localhost:27017 (org.mongodb.driver.connection:71)
[2020-01-22 17:19:52,732] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 3]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2854274} (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,732] INFO Discovered cluster type of STANDALONE (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,733] ERROR Expecting a single STANDALONE, but found more than one.  Removing localhost:27017 from client view of cluster. (org.mongodb.driver.cluster:101)
[2020-01-22 17:19:52,735] INFO Cluster ID: sZ64WgvDRBmrJnawsRJ_7A (org.apache.kafka.clients.Metadata:379)
[2020-01-22 17:19:52,756] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2020-01-22 17:20:02,647] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:02,648] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)

getting some error:
ERROR WorkerSourceTask{id=mongo-fff-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27018, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}, {address=localhost:27019, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}]
    at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
    at com.mongodb.internal.connection.AbstractMultiServerCluster.getDescription(AbstractMultiServerCluster.java:54)
    at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:152)
    at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203)
    at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
    at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128)
    at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123)
    at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:236)
    at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:136)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-22 18:29:09,590] ERROR WorkerSourceTask{id=mongo-fff-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-01-22 18:29:09,593] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1145)
[2020-01-22 18:29:19,482] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)

【问题讨论】:

  • 工人的日志中是否有任何错误?
  • 我会将整个日志堆栈添加到帖子中

标签: mongodb apache-kafka apache-kafka-connect


【解决方案1】:

我的猜测是您的期望不太正确。

在配置文件中,你已经设置了主题的前缀:

topic.prefix=someprefix

但您正试图从名为 someprefix 的主题中消费:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix  --from-beginning

注意topic.prefix配置参数不是将要创建的主题的名称,而只是它的前缀:

topic.prefix

前缀添加到表名以生成名称 将数据发布到的 Kafka 主题,或者在自定义查询的情况下, 要发布到的主题的全名。

类型: string

默认: “”

重要性:

因此,如果您的数据库haifa 有一个名为users 的表,那么将根据您的配置文件创建的主题将命名为someprefixusers(我建议您使用连字符,如topic.prefix=someprefix- 所以最终主题名称更具可读性)。因此,您将不得不使用该主题的记录:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefixusers  --from-beginning

编辑:

您收到Connection Refused 错误,这意味着您的 mongodb 未启动并正在运行,或者您未正确连接数据库。

首先,确保 mongodb 已启动并通过 mongoclient 运行:

mongo mongoHost:mongoPort/dbname

其次,您的连接器配置文件中似乎缺少connection.uri

【讨论】:

  • 你的意思是收藏不是吗?而不是一张桌子,但它也不起作用
  • @scalacode 你能分享你的完整主题列表吗?
  • 好吧,我删除了所有主题并重新启动了 connet worker,我注意到它没有在 kafka 级别创建任何新主题 bin/kafka-topics.sh --list --zookeeper localhost:2181 给出结果 __consumer_offsets
  • @scalacode 您能否发布日志中报告的完整错误跟踪?
  • 完成我在上一条消息的底部添加了它
猜你喜欢
  • 2021-09-17
  • 2021-12-03
  • 2021-04-08
  • 1970-01-01
  • 2021-09-05
  • 2013-08-06
  • 1970-01-01
  • 2011-10-27
  • 2018-02-06
相关资源
最近更新 更多