【问题标题】:Combining result from Flux to result from Mono将 Flux 的结果与 Mono 的结果相结合
【发布时间】:2017-04-19 21:48:19
【问题描述】:

我开始使用 Project reactor,其中一个让我苦恼的地方是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

现在我想要实现的是:

如何结合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用像 Zip 这样的东西,但它会从源代码进行一对一的映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但需要将多个用户添加到该组中。返回 Mono&lt;List&lt;Users&gt; 然后将 zip 运算符与单声道一起使用并提供此处提到的组合器是个好主意吗
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)

【问题讨论】:

    标签: spring-webflux project-reactor reactive-streams


    【解决方案1】:

    这个 1 到 N 的映射听起来类似于我在这里回答的一个问题:

    Can you Flux.zip a mono and a flux and and repeat the mono value for every flux value?

    万一该链接断开,这又是答案。我认为这种方法不会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 包含缓慢的操作,那么拥有一些缓存层可能会很好。

    假设您有这样的通量和单声道:

     // a flux that contains 6 elements.
     final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));
    
     // a mono of 1 element.
     final Mono<String> groupLabel = Mono.just("someGroupLabel");
    

    首先,我将向您展示尝试压缩我尝试的 2 的错误方法,我认为其他人会尝试:

     // wrong way - this will only emit 1 event 
     final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
             .zipWith(groupLabel);
    
     // you'll see that onNext() is only called once, 
     //     emitting 1 item from the mono and first item from the flux.
     wrongWayOfZippingFluxToMono
             .log()
             .subscribe();
    

     // this is how to zip up the flux and mono how you'd want, 
     //     such that every time the flux emits, the mono emits. 
     final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
             .flatMap(userId -> Mono.just(userId)
                     .zipWith(groupLabel));
    
     // you'll see that onNext() is called 6 times here, as desired. 
     correctWayOfZippingFluxToMono
             .log()
             .subscribe();
    

    【讨论】:

      【解决方案2】:

      我认为 Flux.combineLatest 静态方法可以帮助您:因为您的 Mono 只发出 1 个元素,该元素将始终是与来自 Flux 的每个传入值相结合的元素。

      Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                         groupMono, userFlux);
      

      【讨论】:

      • 但我最终使用了Flux.zip(groupMono, userMono, BiFunction),因为在我的情况下,组模型内的底层数据结构是 HashSet(它保存属于组的用户)并且添加来自的用户不是线程安全的组中的用户流量,以反应的方式。因此,我使用 BiFunction 在该单独方法中填充组中的用户。感谢您的帮助,非常感谢您的帮助!
      • 也许我没有得到任何东西,但如果我使用 combineLatestFluxMono 因为这不可能我“放弃”@ 中的第一个事件987654329@ 如果Mono 的第一个事件稍后到来?例如:用户存储库非常快,在组存储库返回一个组之前已经返回了 10 个用户,那么我希望combineLatest 后面的流中的第一个第一个元素是第 10 个用户与组的组合,因为combineLatest 实际上结合了 latest 元素。有人可以帮我解决这个问题吗?
      【解决方案3】:
      Flux.fromIterable(List.of(1,2,3,4,5,6))
            .zipWith(Mono.just("groupLabel").cache().repeat())
      

      将您的标签压缩到通量发出的每个值

      【讨论】:

        【解决方案4】:

        为其他人添加答案,我使用了Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)。我使用了这种方法而不是 @Simon 建议的方法,因为拥有用户的底层组是 HashSet 并且以反应方式添加用户将不是线程安全的。但如果你有一个线程安全的数据结构,我会使用@Simon 的建议。

        【讨论】:

          猜你喜欢
          • 2020-05-04
          • 2020-09-25
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-09-21
          • 2017-10-19
          • 1970-01-01
          相关资源
          最近更新 更多