【发布时间】:2021-07-09 00:01:42
【问题描述】:
您好,我正在尝试从 mongo oplog 实现 MessageListener,它应该从上次停止的流式文档中检索文档。 目前我的代码设置如下。不知道如何从最后一个文档中检索 resumeToken 并设置它,以便如果侦听器应用程序出现故障并重新联机,它应该在最后一次读取后读取。
@Bean
MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate, @Qualifier("candidateMessageListener") MessageListener documentMessageListener)
{
Executor executor = Executors.newSingleThreadExecutor();
MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, executor)
{
@Override
public boolean isAutoStartup()
{
return true;
}
};
ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
.collection("candidate") // The name of the collection to listen to , Do not specify the default listening database
.filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) // Filter the types of operations that need to be monitored , You can specify filter conditions according to your needs
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
// When not set , When the document is updated , Only information about the changed fields will be sent , Set up UPDATE_LOOKUP All information of the document will be returned
.build();
messageListenerContainer.register(request, Candidate.class);
return messageListenerContainer;
}
【问题讨论】:
标签: mongodb spring-data-mongodb message-listener