【发布时间】:2018-10-01 06:23:33
【问题描述】:
基本上,我正在 Spring Boot 中制作队列处理器,并希望使用 Reactor 进行异步。我已经使一个函数需要永远循环,因为它是从队列中拉出然后将项目标记为已处理的函数。
这是有效的阻止版本订阅返回单声道
while(true) {
manager.Subscribe().block()
}
我不知道如何把它变成一个 Flux 我已经查看了一个区间、生成、创建等,如果不调用 block() 就无法工作
这是我尝试过的一个示例
Flux.generate(() -> manager,
(state, sink) -> {
state.Subscribe().block();
sink.next("done");
return state;
}));
作为 Reactor 的新手,我无法找到任何关于循环和同步处理 Monos 而不会阻塞的内容。
以下是使用 AWS Java SDK v2 的订阅方法的作用:
public Mono Subscribe() {
return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
.waitTimeSeconds(10)
.queueUrl(_queueUrl)
.build()))
.filter(x -> x.messages() != null)
.flatMap(x -> Mono.when(x.messages()
.stream()
.map(y -> {
_log.warn(y.body());
return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(_queueUrl)
.receiptHandle(y.receiptHandle())
.build()));
}).collect(Collectors.toList())));
}
基本上,我只是轮询一个 SQS 队列,删除消息然后我想再做一次。这对我来说只是探索性的。
谢谢!
【问题讨论】:
-
那个经理对象是什么?它从何而来?你能展示完整的课程吗?
-
感谢您的回复。它由 Spring Boot DI 管理。除了返回一个我想确保在再次运行之前完成的 Mono 之外,我不确定它的作用。我添加了订阅的一些细节
标签: java project-reactor