【发布时间】:2020-05-04 07:58:53
【问题描述】:
我想创建一个服务,将来自两个反应源的结果结合起来。 一个正在生产 Mono,另一个正在生产 Flux。对于合并,我需要为每个发射的通量使用相同的单声道值。
现在我有这样的东西
Flux.zip(
service1.getConfig(), //produces flux
service2.getContext() //produces mono
.cache().repeat()
)
这给了我我需要的东西,
- service2 只被调用一次
- 为每个配置提供上下文
- 产生的助焊剂具有与配置一样多的元素
但我注意到 repeat() 在缓存上下文后会发出大量元素。这是一个问题吗?
我可以做些什么来将重复次数限制为接收到的配置数量,但仍然同时请求两个? 或者这不是问题,我可以放心地忽略那些额外发出的元素?
我尝试使用combineLatest,但根据时间的不同,我的一些配置元素可能会丢失并且未被处理。
编辑
查看@Ricard Kollcaku 的建议,我创建了示例测试,说明为什么这不是我想要的。
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class SampleTest
{
Logger LOG = LoggerFactory.getLogger(SampleTest.class);
AtomicLong counter = new AtomicLong(0);
Flux<String> getFlux()
{
return Flux.fromStream(() -> {
LOG.info("flux started");
sleep(1000);
return Stream.of("a", "b", "c");
}).subscribeOn(Schedulers.parallel());
}
Mono<String> getMono()
{
return Mono.defer(() -> {
counter.incrementAndGet();
LOG.info("mono started");
sleep(1000);
return Mono.just("mono");
}).subscribeOn(Schedulers.parallel());
}
private void sleep(final long milis)
{
try
{
Thread.sleep(milis);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
}
@Test
void test0()
{
final Flux<String> result = Flux.zip(
getFlux(),
getMono().cache().repeat()
.doOnNext(n -> LOG.warn("signal on mono", n)),
(s1, s2) -> s1 + " " + s2
);
assertResults(result);
}
@Test
void test1()
{
final Flux<String> result =
getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
(s1, s2) -> s1 + " " + s2));
assertResults(result);
}
@Test
void test2()
{
final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
assertResults(result);
}
void assertResults(final Flux<String> result)
{
final Flux<String> flux = result;
StepVerifier.create(flux)
.expectNext("a mono")
.expectNext("b mono")
.expectNext("c mono")
.verifyComplete();
Assertions.assertEquals(1L, counter.get());
}
查看 test1 和 test2 的测试结果
2020-01-20 12:55:22.542 INFO [] [] [ parallel-3] SampleTest : flux started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-4] SampleTest : mono started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-5] SampleTest : mono started
2020-01-20 12:55:24.548 INFO [] [] [ parallel-6] SampleTest : mono started
expected: <1> but was: <3>
我需要拒绝你的提议。在这两种情况下 getMono 都是 - 被调用的次数与不断变化的项目一样多 - 在通量的第一个元素到达后调用 这些是我想避免的互动。我的服务正在后台发出 http 请求,它们可能很耗时。
我目前的解决方案没有这个问题,但是如果我将记录器添加到我的 zip 中,我会得到这个
2020-01-20 12:55:20.505 INFO [] [] [ parallel-1] SampleTest : flux started
2020-01-20 12:55:20.508 INFO [] [] [ parallel-2] SampleTest : mono started
2020-01-20 12:55:21.523 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.528 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.535 WARN [] [] [ parallel-2] SampleTest : signal on mono
如您所见,将cache().repeat() 组合在一起会发出很多元素,我想知道这是否是一个问题,如果是,那么如何避免它(但保持单次调用和并行调用)。
【问题讨论】:
-
为什么不通过 Mono 进行平面映射,例如像这样
Mono.just(1).flux().flatMap(v1 -> Flux.just(2, 3).map(v2 -> v1 + v2)); -
你的问题是这个?流 1 发出 1,2,3,4 流 2 仅发出 a。当你想处理 1,a - 2,a - 3,a - 4,a 时,你只能处理 1,a 吗?
-
@efan - 这会起作用,但是(根据我对反应的理解)我只会在
Mono发出一个值时向Flux.just(2, 3)发出请求。我想同时启动这两个请求,并在结果可用时合并它们。 @RicardKollcaku - 不完全是。我能够处理1,a 2,a 3,a ...,但在实现它的过程中,由于cache().repeat()的工作方式,我产生了荒谬的a。 -
@RobertOzga 通过您的编辑,我更好地解释了为什么您不会多次创建单声道。我已经编辑了答案,所以 Mono 只会被创建 1 次
标签: java project-reactor reactor