【问题标题】:Spring data mongo resume token for MessageListenerContainerMessageListenerContainer 的 Spring 数据 mongodb 恢复令牌
【发布时间】:2021-07-09 00:01:42
【问题描述】:

您好,我正在尝试从 mongo oplog 实现 MessageListener,它应该从上次停止的流式文档中检索文档。 目前我的代码设置如下。不知道如何从最后一个文档中检索 resumeToken 并设置它,以便如果侦听器应用程序出现故障并重新联机,它应该在最后一次读取后读取。

    @Bean
    MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate, @Qualifier("candidateMessageListener") MessageListener documentMessageListener)
    {
    Executor executor = Executors.newSingleThreadExecutor();
    MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, executor)
    {
        @Override
        public boolean isAutoStartup()
        {
        return true;
        }
    };
    ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
                    .collection("candidate") // The name of the collection to listen to , Do not specify the default listening database
                    .filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) // Filter the types of operations that need to be monitored , You can specify filter conditions according to your needs
                    .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                    // When not set , When the document is updated , Only information about the changed fields will be sent , Set up UPDATE_LOOKUP All information of the document will be returned
                    .build();
    messageListenerContainer.register(request, Candidate.class);
    return messageListenerContainer;
    }

【问题讨论】:

    标签: mongodb spring-data-mongodb message-listener


    【解决方案1】:

    您可以使用ChangeStreamEvent 上的getResumeToken()getTimestamp() 方法来获取此信息。详情请见Reference Documentation - Resuming Change Streams

    【讨论】:

    • 我看到了这个,但不确定在哪里设置 getResumeToken 以及如何检索它。你能告诉我一个实现吗?
    • ChangeStreamTests 中有两种变体的用法。
    猜你喜欢
    • 2020-11-11
    • 2019-07-10
    • 2020-12-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多