【问题标题】:Combine Mono with Flux将 Mono 与 Flux 相结合
【发布时间】:2020-05-04 07:58:53
【问题描述】:

我想创建一个服务,将来自两个反应源的结果结合起来。 一个正在生产 Mono,另一个正在生产 Flux。对于合并,我需要为每个发射的通量使用相同的单声道值。

现在我有这样的东西

Flux.zip(
   service1.getConfig(), //produces flux
   service2.getContext() //produces mono
           .cache().repeat()
)

这给了我我需要的东西,

  • service2 只被调用一次
  • 为每个配置提供上下文
  • 产生的助焊剂具有与配置一样多的元素

但我注意到 repeat() 在缓存上下文后会发出大量元素。这是一个问题吗?

我可以做些什么来将重复次数限制为接收到的配置数量,但仍然同时请求两个? 或者这不是问题,我可以放心地忽略那些额外发出的元素?

我尝试使用combineLatest,但根据时间的不同,我的一些配置元素可能会丢失并且未被处理。

编辑

查看@Ricard Kollcaku 的建议,我创建了示例测试,说明为什么这不是我想要的。

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class SampleTest
{
    Logger LOG = LoggerFactory.getLogger(SampleTest.class);
    AtomicLong counter = new AtomicLong(0);

    Flux<String> getFlux()
    {
        return Flux.fromStream(() -> {
            LOG.info("flux started");
            sleep(1000);
            return Stream.of("a", "b", "c");
        }).subscribeOn(Schedulers.parallel());
    }

    Mono<String> getMono()
    {
        return Mono.defer(() -> {
            counter.incrementAndGet();
            LOG.info("mono started");
            sleep(1000);
            return Mono.just("mono");
        }).subscribeOn(Schedulers.parallel());
    }

    private void sleep(final long milis)
    {
        try
        {
            Thread.sleep(milis);
        }
        catch (final InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    @Test
    void test0()
    {
        final Flux<String> result = Flux.zip(
                getFlux(),
                getMono().cache().repeat()
                         .doOnNext(n -> LOG.warn("signal on mono", n)),
                (s1, s2) -> s1 + " " + s2
        );

        assertResults(result);
    }

    @Test
    void test1()
    {
        final Flux<String> result =
                getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
                        (s1, s2) -> s1 + " " + s2));
        assertResults(result);
    }

    @Test
    void test2()
    {
        final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
        assertResults(result);
    }

    void assertResults(final Flux<String> result)
    {
        final Flux<String> flux = result;

        StepVerifier.create(flux)
                    .expectNext("a mono")
                    .expectNext("b mono")
                    .expectNext("c mono")
                    .verifyComplete();

        Assertions.assertEquals(1L, counter.get());
    }

查看 test1 和 test2 的测试结果

2020-01-20 12:55:22.542 INFO  [] [] [     parallel-3]  SampleTest  : flux started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-4]  SampleTest  : mono started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-5]  SampleTest  : mono started  
2020-01-20 12:55:24.548 INFO  [] [] [     parallel-6]  SampleTest  : mono started  

expected: <1> but was: <3>

我需要拒绝你的提议。在这两种情况下 getMono 都是 - 被调用的次数与不断变化的项目一样多 - 在通量的第一个元素到达后​​调用 这些是我想避免的互动。我的服务正在后台发出 http 请求,它们可能很耗时。

我目前的解决方案没有这个问题,但是如果我将记录器添加到我的 zip 中,我会得到这个

2020-01-20 12:55:20.505 INFO  [] [] [     parallel-1]  SampleTest  : flux started  
2020-01-20 12:55:20.508 INFO  [] [] [     parallel-2]  SampleTest  : mono started  
2020-01-20 12:55:21.523 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.528 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.535 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  

如您所见,将cache().repeat() 组合在一起会发出很多元素,我想知道这是否是一个问题,如果是,那么如何避免它(但保持单次调用和并行调用)。

【问题讨论】:

  • 为什么不通过 Mono 进行平面映射,例如像这样Mono.just(1).flux().flatMap(v1 -&gt; Flux.just(2, 3).map(v2 -&gt; v1 + v2));
  • 你的问题是这个?流 1 发出 1,2,3,4 流 2 仅发出 a。当你想处理 1,a - 2,a - 3,a - 4,a 时,你只能处理 1,a 吗?
  • @efan - 这会起作用,但是(根据我对反应的理解)我只会在Mono 发出一个值时向Flux.just(2, 3) 发出请求。我想同时启动这两个请求,并在结果可用时合并它们。 @RicardKollcaku - 不完全是。我能够处理1,a 2,a 3,a ...,但在实现它的过程中,由于cache().repeat() 的工作方式,我产生了荒谬的a
  • @RobertOzga 通过您的编辑,我更好地解释了为什么您不会多次创建单声道。我已经编辑了答案,所以 Mono 只会被创建 1 次

标签: java project-reactor reactor


【解决方案1】:

我认为您想要实现的目标可以通过 Flux.join 完成

下面是一些示例代码:

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);

【讨论】:

    【解决方案2】:

    Project Reactor 和 RxJava 等库尝试提供尽可能多的功能组合,但不提供对组合功能工具的访问。因此,总是有一些极端情况没有被覆盖。

    据我所知,我自己的 DF4J 是唯一提供组合功能的异步库。例如,用户可以这样压缩 Flux 和 Mono:(当然,这个类不是 DF4J 本身的一部分):

    import org.df4j.core.dataflow.Actor;
    import org.df4j.core.port.InpFlow;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    abstract class ZipActor<T1, T2> extends Actor {
        InpFlow<T1> inpFlow = new InpFlow<>(this);
        InpFlow<T2> inpScalar = new InpFlow<>(this);
    
        ZipActor(Flux<T1> flux, Mono<T2> mono) {
            flux.subscribe(inpFlow);
            mono.subscribe(inpScalar);
        }
    
        @Override
        protected void runAction() throws Throwable {
            if (inpFlow.isCompleted()) {
                stop();
                return;
            }
            T1 element1 = inpFlow.removeAndRequest();
            T2 element2 = inpScalar.current();
            runAction(element1, element2);
        }
    
        protected abstract void runAction(T1 element1, T2 element2);
    }
    

    这就是它的使用方法:

    @Test
    public void ZipActorTest() {
        Flux<Integer> flux = Flux.just(1,2,3);
        Mono<Integer> mono = Mono.just(5);
        ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){
            @Override
            protected void runAction(Integer element1, Integer element2) {
                System.out.println("got:"+element1+" and:"+element2);
            }
        };
        actor.start();
        actor.join();
    }
    

    控制台输出如下:

    got:1 and:5
    got:2 and:5
    got:3 and:5
    

    【讨论】:

    • 感谢他的建议,这可能会解决我的问题。不幸的是,为了能够将您的库包含在我们的项目中(例如支持/补丁等问题),我需要与我公司的“公司政策”作斗争:(如果还包括解决一个问题的外部库可能不是最好的方法还有其他(不是那么好但仍然)可行的解决方案。不过,感谢您的发帖,我会牢记您的 lib 以备将来使用 :)
    • 我不确定我是否理解您从这里来的地方 - reactor 确实提供了通过 Flux.zip 等方法组合流的功能。为了透明起见,您还应该注意您是 DF4J 的作者。
    【解决方案3】:

    你可以通过一个简单的改变来做到这一点

        getFlux()
        .flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2))
        .subscribe(System.out::println);
    
    Flux<String> getFlux(){
        return Flux.just("a","b","c");
    }
    Mono<String> getMono(){
        return  Mono.just("mono");
    }
    

    如果你想使用 zip 但你可以使用 flatmap 获得相同的结果

          getFlux()
                .flatMap(s -> getMono()
                        .map((s1 -> s + " " + s1)))
                .subscribe(System.out::println);
    }
    
    Flux<String> getFlux() {
        return Flux.just("a", "b", "c");
    }
    
    Mono<String> getMono() {
        return Mono.just("mono");
    }
    

    两个结果都是: 单声道 b 单声道 c单声道

    编辑 好的,现在我更好地理解了。你可以试试这个解决方案。

       getMono().
                flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s))
                .subscribe(System.out::println);
    
    
    Flux<String> getFlux() {
        return Flux.defer(() -> {
            System.out.println("init flux");
            return Flux.just("a", "b", "c");
        });
    }
    
    Mono<String> getMono() {
        return Mono.defer(() -> {
            System.out.println("init Mono");
            return Mono.just("sss");
        });
    }
    

    【讨论】:

    • 感谢您的回复,但这不起作用。我已经编辑了我的问题,以更好地解释我的期望以及为什么您的提案不符合我的需求。
    • 这样效果更好 - 单次调用单声道。但代码仍在等待 mono 在调用通量请求之前发出结果。我宁愿同时启动两者(如果一个失败则取消其他请求的压缩逻辑在这里也很好)但我看起来这是不可能的
    猜你喜欢
    • 1970-01-01
    • 2019-09-21
    • 1970-01-01
    • 2021-09-30
    • 1970-01-01
    • 2019-01-07
    • 1970-01-01
    • 2021-11-23
    • 2018-03-26
    相关资源
    最近更新 更多