【发布时间】:2020-05-08 19:37:21
【问题描述】:
我正在使用带有 mongo 的 Kafka 连接(源),连接工作器正在运行,但它没有将数据写入 Kafka 主题,我正在使用源连接器和连接器的以下配置文件:
name=mongo-ff
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=haifa
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
但是当我尝试使用来自主题的数据时:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic someprefix --from-beginning
我没有得到任何数据。
好像没有按预期写。
日志堆栈的一部分是:
... 3 more
[2020-01-22 17:19:52,727] INFO Opened connection [connectionId{localValue:3, serverValue:20}] to localhost:27017 (org.mongodb.driver.connection:71)
[2020-01-22 17:19:52,732] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 3]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2854274} (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,732] INFO Discovered cluster type of STANDALONE (org.mongodb.driver.cluster:71)
[2020-01-22 17:19:52,733] ERROR Expecting a single STANDALONE, but found more than one. Removing localhost:27017 from client view of cluster. (org.mongodb.driver.cluster:101)
[2020-01-22 17:19:52,735] INFO Cluster ID: sZ64WgvDRBmrJnawsRJ_7A (org.apache.kafka.clients.Metadata:379)
[2020-01-22 17:19:52,756] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2020-01-22 17:20:02,647] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:02,648] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-22 17:20:12,649] INFO WorkerSourceTask{id=mongo-fff-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
getting some error:
ERROR WorkerSourceTask{id=mongo-fff-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27018, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}, {address=localhost:27019, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connexion refusée (Connection refused)}}]
at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
at com.mongodb.internal.connection.AbstractMultiServerCluster.getDescription(AbstractMultiServerCluster.java:54)
at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:152)
at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:103)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:284)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:188)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:203)
at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:128)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.iterator(ChangeStreamIterableImpl.java:123)
at com.mongodb.kafka.connect.source.MongoSourceTask.createCursor(MongoSourceTask.java:236)
at com.mongodb.kafka.connect.source.MongoSourceTask.start(MongoSourceTask.java:136)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-01-22 18:29:09,590] ERROR WorkerSourceTask{id=mongo-fff-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2020-01-22 18:29:09,593] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1145)
[2020-01-22 18:29:19,482] INFO WorkerSourceTask{id=mongo-fff-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
【问题讨论】:
-
工人的日志中是否有任何错误?
-
我会将整个日志堆栈添加到帖子中
标签: mongodb apache-kafka apache-kafka-connect