【问题标题】:How to iterate Flux and mix with Mono如何迭代 Flux 并与 Mono 混合
【发布时间】:2018-03-26 10:04:30
【问题描述】:

当我应该向用户发送电子邮件时,我有一个用例。 首先,我创建电子邮件正文。

Mono<String> emailBody = ...cache();

然后我选择用户并将电子邮件发送给他们:

Flux.fromIterable(userRepository.findAllByRole(Role.USER))
            .map(User::getEmail)
            .doOnNext(email -> sendEmail(email, emailBody.block(), massSendingSubject))
            .subscribe();

我不喜欢的东西

  1. 没有 cache() 方法 emailBody Mono 在每个迭代步骤中计算。
  2. 要获取 emailBody 值,我使用 emailBody.block() 但可能有一种反应方式而不是在 Flux 流中调用 block 方法?

【问题讨论】:

    标签: java reactive-programming project-reactor spring-webflux


    【解决方案1】:

    此代码示例中有几个问题。 我假设这是一个响应式 Web 应用程序。

    首先,不清楚您是如何创建电子邮件正文的;您是从数据库还是远程服务中获取内容?如果它主要受 CPU 限制(而不是 I/O),那么您不需要将其包装成响应式类型。现在,如果它应该包装在 Publisher 中,并且所有用户的电子邮件内容都相同,那么使用 cache 运算符是一个不错的选择。

    另外,Flux.fromIterable(userRepository.findAllByRole(Role.USER)) 建议您从响应式上下文调用阻塞存储库。

    您应该永远不要doOn*** 运算符中执行繁重的 I/O 操作。这些是为记录或轻微的副作用操作而设计的。您需要.block() 的事实是您将阻塞整个反应管道的另一个线索。

    最后一个:你不应该在 Web 应用程序的任何地方调用subscribe。如果这绑定到 HTTP 请求,那么您基本上会触发反应式管道,而不能保证资源或完成。调用subscribe 会触发管道,但不会等到它完成(此方法返回Disposable)。

    一个更“典型”的示例如下所示:

    Flux<User> users = userRepository.findAllByRole(Role.USER);
    String emailBody = emailContentGenerator.createEmail();
    
    
    // sendEmail() should return Mono<Void> to signal when the send operation is done
    Mono<Void> sendEmailsOperation = users
         .flatMap(user -> sendEmail(user.getEmail(), emailBody, subject))
         .then();
    
    // something else should subscribe to that reactive type,
    // you could plug that as a return value of a Controller for example
    

    如果您不知何故受困于阻塞组件(例如 sendEmail 之一),您应该将这些阻塞操作安排在特定的调度程序上,以避免阻塞整个反应式管道。为此,请查看Schedulers section on the reactor reference documentation

    【讨论】:

    • 感谢您的回答。了解doOn***非常有用。电子邮件正文是通过从数据库中获取新项目并进行一些排序/分组操作生成的,因此它是 Mono。在这种情况下,您将如何修改 sendEmailsOperation?
    • sendEmail 在这种情况下可能应该采用 Mono 或更好的 Flux 作为参数。
    • sendEmail - 需要 Java 类型的 3d 方 API。你也不能像 user.map(u-> sendEmail(emailBodyMono)) 这样在 Flux 中调用 Mono,因为应该有人启动 emailBodyMono 流。我通过调用 block() 方法来做到这一点。
    • 如果您到处调用阻塞 I/O API,那么使用 WebFlux 几乎没有什么好处。最后,您的大部分代码都需要在特殊的调度程序上进行调度。
    • 我理解,但我们有很多遗留的 Java 阻塞库。而且我正在尝试将响应式代码与旧代码混合,因为它比在任何地方使用旧代码更好。
    猜你喜欢
    • 2021-09-30
    • 2020-05-04
    • 1970-01-01
    • 2019-01-07
    • 2021-11-23
    • 1970-01-01
    • 2020-01-22
    • 2021-07-31
    • 1970-01-01
    相关资源
    最近更新 更多