【发布时间】:2021-09-08 21:44:54
【问题描述】:
Reactor 中是否有办法从一个管道将项目添加到队列中,让另一个线程/Flux 从该队列中获取项目,并让原始管道等待另一个线程完成该项目的处理?
我需要与此类似的东西,因为可能有许多不同的来源,但我想要/需要一个用于背压、重试等的集中管道。
我现在有一些“有效”的东西,但它涉及大量线程和信号量的使用。不同的管道也会失去彼此之间的上下文,因此未来的事情(如反应式事务)将无法正常工作。
我目前正在使用的“工作”版本是这样的:
public class Source {
private final Queue queue = new Queue();
public Flux<String> produce() {
return Flux.range(0, 10)
.doOnNext(v -> System.out.println("Before queue processed: " + v))
.flatMap(this.queue::enqueue)
.doOnNext(v -> System.out.println("After queue processed: " + v));
}
}
public class Queue {
private class WorkEntry {
int number;
String word;
Throwable exception;
final Semaphore semaphore = new Semaphore(0);
}
private final LinkedBlockingQueue<WorkEntry> blockingQueue = new LinkedBlockingQueue<>();
public Mono<String> enqueue(int number) {
return Mono.just(number)
.flatMap(n -> {
final var entry = new WorkEntry();
entry.number = number;
this.blockingQueue.add(entry);
return Mono.just(entry);
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(entry -> {
try {
// block until drain() has finished processing item
entry.semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 'word' has been set inside drain() processing
return Mono.just(entry.word);
});
}
public void drain() {
Flux
.<WorkEntry>generate(sink -> {
final var entry = blockingQueue.poll();
if (entry == null) {
sink.complete();
} else {
sink.next(entry);
}
})
.flatMap(entry -> Mono.just(entry)
.flatMap(e -> Mono.just(e.number + "!"))
.onErrorResume(ex -> {
entry.exception = ex;
return Mono.empty();
})
.doOnSuccess(word -> {
entry.word = word;
entry.semaphore.release(1);
}))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
public class Runner {
public void run() {
final var source = new Source();
source.produce()
.subscribe();
source.queue.drain();
}
}
将打印的内容:
Before queue processed: 0
...
Before queue processed: 9
After queue processed: 0!
...
After queue processed: 9!
当然,这与它在我的代码中的工作方式相去甚远。但我想它的主旨是可以理解的?
我怎样才能让它变得更好?
如何避免在.flatMap 中使用阻塞调用?
如何避免拉起这么多弹性线?
如何始终保持上下文相同?
将其重写为拥有一个从不同来源获取的 Flux?但是,这里的问题是,编写更简单的测试用例会变得更加困难,因为我制作了一个源推送项,并且知道该特定源何时完成。
编辑:我找到了一种更好的处理方法,无需信号量锁。不完美,但更好。这可以通过使用delayUntil 和接收器在源和队列之间进行通信来实现。
...
public class Queue {
private class WorkEntry {
int number;
final Sink.One<String> sink;
}
private final LinkedBlockingQueue<WorkEntry> blockingQueue = new LinkedBlockingQueue<>();
public Mono<String> enqueue(int number) {
return Mono.just(number)
.flatMap(n -> {
final var entry = new WorkEntry();
entry.number = number;
entry.sink = Sink.one();
this.blockingQueue.add(entry);
return Mono.just(entry);
})
.subscribeOn(Schedulers.boundedElastic())
.delayUntil(e -> e.sink.asMono())
.flatMap(e -> e.sink.asMono());
}
public void drain() {
Flux
.<WorkEntry>generate(sink -> {
final var entry = blockingQueue.poll();
if (entry == null) {
sink.complete();
} else {
sink.next(entry);
}
})
.flatMap(entry -> Mono.just(entry)
.flatMap(e -> Mono.just(e.number + "!"))
.doOnError(t -> entry.sink.tryEmitError(t))
.doOnSuccess(word -> entry.sink.tryEmitValue(word))
)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
...
这是可能的,因为 Sink.one() 的 .asMono() 在每次调用时都会返回相同的实例。所以我们既可以使用它来延迟,也可以在.flatMap() 中返回结果。
【问题讨论】:
-
现在我看到了你的实现,我明白你想要什么,我可以直接告诉你,你基本上想要做的就是阻止,直到处理某些事情。反应式编程的全部目的是任何线程都可以从任何线程中断的地方继续,而你想要做的事情基本上违背了这个目的。
标签: java reactive-programming spring-webflux project-reactor reactor