【问题标题】:Reactor: function creating Monos to FluxReactor:创建 Monos 到 Flux 的函数
【发布时间】:2018-10-01 06:23:33
【问题描述】:

基本上,我正在 Spring Boot 中制作队列处理器,并希望使用 Reactor 进行异步。我已经使一个函数需要永远循环,因为它是从队列中拉出然后将项目标记为已处理的函数。

这是有效的阻止版本订阅返回单声道

while(true) {
    manager.Subscribe().block()
}

我不知道如何把它变成一个 Flux 我已经查看了一个区间、生成、创建等,如果不调用 block() 就无法工作

这是我尝试过的一个示例

Flux.generate(() -> manager,
    (state, sink) -> {
        state.Subscribe().block();
        sink.next("done");
        return state;
    }));

作为 Reactor 的新手,我无法找到任何关于循环和同步处理 Monos 而不会阻塞的内容。

以下是使用 AWS Java SDK v2 的订阅方法的作用:

public Mono Subscribe() {
    return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
            .waitTimeSeconds(10)
            .queueUrl(_queueUrl)
            .build()))
            .filter(x -> x.messages() != null)
            .flatMap(x -> Mono.when(x.messages()
                    .stream()
                    .map(y -> {
                        _log.warn(y.body());
                        return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
                                .queueUrl(_queueUrl)
                                .receiptHandle(y.receiptHandle())
                                .build()));
                    }).collect(Collectors.toList())));
}

基本上,我只是轮询一个 SQS 队列,删除消息然后我想再做一次。这对我来说只是探索性的。

谢谢!

【问题讨论】:

  • 那个经理对象是什么?它从何而来?你能展示完整的课程吗?
  • 感谢您的回复。它由 Spring Boot DI 管理。除了返回一个我想确保在再次运行之前完成的 Mono 之外,我不确定它的作用。我添加了订阅的一些细节

标签: java project-reactor


【解决方案1】:

您需要两件事:一种在循环中订阅的方法,以及一种确保在每次迭代中有效调用Subscribe() 方法的方法(因为需要重新创建Future)。

repeat() 是一个内置运算符,一旦源完成,它将重新订阅其源。如果源出错,则重复循环停止。最简单的变体继续这样做Long.MAX_VALUE 次。

唯一的问题是,在您的情况下,Subscribe() 中的Mono 必须在每次迭代时重新创建。

为此,您可以将 Subscribe() 调用包装在 defer 中:它会在每次发生新订阅时重新调用该方法,包括每次重复尝试:

Flux<Stuff> repeated = Mono
    .defer(manager::Subscribe)
    .repeat();

【讨论】:

  • 完美运行,谢谢!我不知道为什么我没有注意到 API 文档中有重复。
猜你喜欢
  • 1970-01-01
  • 2021-07-11
  • 2019-09-27
  • 2017-12-17
  • 1970-01-01
  • 1970-01-01
  • 2021-11-23
  • 2017-06-17
  • 1970-01-01
相关资源
最近更新 更多