【发布时间】:2020-01-12 22:59:31
【问题描述】:
我正在使用 Flux.generate() 创建一个 Flux。生成器(消费者)实际上是从消息队列中读取的。问题是这个调用需要很长时间(有时甚至 1-2 秒)。这将使助焊剂停止处理。
package com.github.loa.vault.service.listener;
import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;
import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {
private final QueueManipulator queueManipulator;
@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);
documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}
显然添加 parallel 并没有帮助,因为生成器仍然一次被调用一个。
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
有人知道如何使发电机并联吗?我不想使用Flux.create(),因为那样我会失去背压。
【问题讨论】:
-
你打算做什么平行?从空阻塞队列中读取?
-
@AlexeiKaigorodov 是的,从 Apache Artemis 队列中并行读取。我打开了多个连接并且不关心消息确认(消息由服务器预先确认)。
-
为什么不从文档中描述的 Apache Artemis 队列中读取?像 Flux 这样的任何额外包装器只会增加开销并降低并行度。
-
@AlexeiKaigorodov 我的整个应用程序都是反应式的(数据库驱动程序等),据我所知,Artemis 不提供任何反应式驱动程序。
-
如果 Artemis 不提供响应式驱动程序,那么使用响应式流读取 Artemis 队列绝对没有意义。反应式流要求所有各方都遵守反应式协议。由于 Artemis 队列没有,将其用作发布者或订阅者只会导致错误。
标签: java reactive-programming project-reactor