【问题标题】:MongoDB change stream replica set recoveryMongoDB变更流副本集恢复
【发布时间】:2018-05-02 17:29:59
【问题描述】:

我使用 Spring 实现了 MongoDB 更改流,当副本集主节点启动时它可以正常工作。

@Service
public class ChangeEventService {
    private static final Logger logger = LoggerFactory.getLogger(ChangeEventService.class);
    private final MongoClient mongoClient;
    public ChangeEventService(MongoClient mongoClient) {
        this.mongoClient = mongoClient;
    }
    @PostConstruct
    public void subscribe() {
        MongoDatabase db = mongoClient.getDatabase("experiment");
        MongoCollection<Document> collection = db.getCollection("debug");
        Block<ChangeStreamDocument<Document>> printBlock = changeStreamDocument -> {
            logger.info("Received: {}", changeStreamDocument.getFullDocument().toString());
            BsonDocument resumeToken = changeStreamDocument.getResumeToken();
        };
        collection.watch().forEach(printBlock);
        logger.info("Consumer is ready to process");
    }
}

然后我关闭了副本集的主节点。我期待更改流等待副本集选择新的主节点并继续获取数据更改。实际行为是应用程序崩溃。

从日志中我可以看到与主要 (27000) 的连接已关闭,这是预期的,然后它似乎尝试打开与辅助 (27001) 之一的连接,但由于池已关闭。

从文档中:“更改流绑定到一个集合,并且更改流文档使用游标进行迭代。只要与 MongoDB 部署的连接保持打开并且集合保持打开状态,该游标就会一直保持打开状态,直到它被显式关闭存在。”

2018-05-02 12:03:03.424  INFO 9560 --- [           main] c.e.m.service.ChangeEventService         : Received: Document{{_id=5ae98cd7dcc8921c94d5f9e5, _class=com.mongodb.BasicDBObject, uuid=4f836d00-efc3-4d48-956a-af4dbfed90e7, now=Wed May 02 12:03:03 CEST 2018}}
2018-05-02 12:03:06.500  WARN 9560 --- [           main] org.mongodb.driver.connection            : Got socket exception on connection [connectionId{localValue:4, serverValue:8}] to localhost:27000. All connections to localhost:27000 will be closed.
2018-05-02 12:03:06.501  INFO 9560 --- [           main] org.mongodb.driver.connection            : Closed connection [connectionId{localValue:4, serverValue:8}] to localhost:27000 because there was a socket exception raised by this connection.
2018-05-02 12:03:07.502  INFO 9560 --- [           main] org.mongodb.driver.connection            : Closed connection [connectionId{localValue:6}] to localhost:27000 because there was a socket exception raised by this connection.
2018-05-02 12:03:07.504  WARN 9560 --- [           main] ationConfigEmbeddedWebApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'changeEventService': Invocation of init method failed; nested exception is com.mongodb.MongoSocketOpenException: Exception opening socket
2018-05-02 12:03:07.505  INFO 9560 --- [localhost:27000] org.mongodb.driver.cluster               : Exception in monitor thread while connecting to server localhost:27000
com.mongodb.MongoSocketOpenException: Exception opening socket
    at com.mongodb.connection.SocketStream.open(SocketStream.java:62) ~[mongodb-driver-core-3.6.3.jar:na]
    at com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:126) ~[mongodb-driver-core-3.6.3.jar:na]
    at com.mongodb.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:128) ~[mongodb-driver-core-3.6.3.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_161]
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_161]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_161]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_161]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_161]
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_161]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_161]
    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_161]
    at com.mongodb.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:59) ~[mongodb-driver-core-3.6.3.jar:na]
    at com.mongodb.connection.SocketStream.open(SocketStream.java:57) ~[mongodb-driver-core-3.6.3.jar:na]
    ... 3 common frames omitted
2018-05-02 12:03:07.507  INFO 9560 --- [           main] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:7, serverValue:181}] to localhost:27001
2018-05-02 12:03:07.508  INFO 9560 --- [           main] org.mongodb.driver.connection            : Closed connection [connectionId{localValue:7, serverValue:181}] to localhost:27001 because the pool has been closed.
2018-05-02 12:03:07.511  INFO 9560 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2

【问题讨论】:

  • 快速提问。如果您尝试另一个列表并将可尾光标直接放在 oplog 上,那么会发生什么?我有点期待同样的事情,就像我期待的那样,它是您期望检查的核心位置,根本不会发生在“驱动程序中”。我的直觉说您需要自己捕获异常并恢复流,但如果它实际上与基本可尾光标一起工作,那么该核心代码就在驱动程序中的某个地方,我愿意在这一点上调用“错误”。但不是以前,因为我怀疑这是故意的。

标签: spring mongodb mongo-java-driver


【解决方案1】:

这里有两件事:

在 com.mongodb.connection.SocketStream.open(SocketStream.java:57) ~[mongodb-driver-core-3.6.3.jar:na]

MongoDB Java driver v3.6.3 存在一个错误,当尝试终止现有游标失败时,change streams 游标不会恢复。这在JAVA-2821 中进行了描述,并在版本 3.7.0 及更高版本中修复。

collection.watch().forEach(printBlock);

watch() 方法实际上并不联系服务器,而是您应该使用迭代器方法。例如:

MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();

ChangeStreamDocument<Document> next = cursor.next();

while(cursor.hasNext()){
    next = cursor.next();
    System.out.println(next);
}

另请参阅Spec: Resumable Error 以了解被认为可恢复的错误的定义。

【讨论】:

  • 我使用 3.7.0-rc0 版本进行了测试,使用我的原始代码可以正常工作
猜你喜欢
  • 2022-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-20
相关资源
最近更新 更多