【问题标题】:Preload elements for Flux.generate(...)Flux.generate(...) 的预加载元素
【发布时间】:2020-01-12 22:59:31
【问题描述】:

我正在使用 Flux.generate() 创建一个 Flux。生成器(消费者)实际上是从消息队列中读取的。问题是这个调用需要很长时间(有时甚至 1-2 秒)。这将使助焊剂停止处理。

package com.github.loa.vault.service.listener;

import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;

import java.util.function.Consumer;

@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {

    private final QueueManipulator queueManipulator;

    @Override
    public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
        final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
                queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);

        documentSourceItemSynchronousSink.next(
                DocumentArchivingContext.builder()
                        .type(DocumentType.valueOf(documentArchivingMessage.getType()))
                        .source(documentArchivingMessage.getSource())
                        .content(documentArchivingMessage.getContent())
                        .build()
        );
    }
}

显然添加 parallel 并没有帮助,因为生成器仍然一次被调用一个。

Flux.generate(vaultQueueConsumer)
    .parallel()
    .runOn(Schedulers.parallel()) 
    .flatMap(vaultDocumentManager::archiveDocument)
    .subscribe();

有人知道如何使发电机并联吗?我不想使用Flux.create(),因为那样我会失去背压。

【问题讨论】:

  • 你打算做什么平行?从空阻塞队列中读取?
  • @AlexeiKaigorodov 是的,从 Apache Artemis 队列中并行读取。我打开了多个连接并且不关心消息确认(消息由服务器预先确认)。
  • 为什么不从文档中描述的 Apache Artemis 队列中读取?像 Flux 这样的任何额外包装器只会增加开销并降低并行度。
  • @AlexeiKaigorodov 我的整个应用程序都是反应式的(数据库驱动程序等),据我所知,Artemis 不提供任何反应式驱动程序。
  • 如果 Artemis 不提供响应式驱动程序,那么使用响应式流读取 Artemis 队列绝对没有意义。反应式流要求所有各方都遵守反应式协议。由于 Artemis 队列没有,将其用作发布者或订阅者只会导致错误。

标签: java reactive-programming project-reactor


【解决方案1】:
Mono.just(1).repeat()  // create infinite flux, maybe there is a nicer way for that?
    .flatMap(this::readFromQueue, 100) // define queue polling concurrency
    .flatMap(this::archiveDocument)
    .subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
    return Mono.fromCallable(() -> {
        Thread.sleep(1500); // your actual blocking queue polling here
        return "queue_element";
    }).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}

【讨论】:

  • 我对这个仍然不完全满意,但它肯定更好。 :)
【解决方案2】:

问题在于vaultQueueConsumer 包含运行缓慢。 因此,解决方案是将这个缓慢的操作从generate 提取到可以并行化的map

作为一个想法,您可以生成一个队列名称,必须从中消费消息,并在使通量并行后在 map 方法中执行实际的消息消费:

String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
    .parallel()
    .runOn(Schedulers.parallel())
    .map(queueManipulator::readMessage)
    .doOnNext(log::info)
    .subscribe();

假的QueueManipulator在返回消息前会休眠1-2秒:

public class QueueManipulator {

  private final AtomicLong counter = new AtomicLong();

  public String readMessage(String queue) {
    sleep(); //sleep 1-2 seconds
    return queue + " " + counter.incrementAndGet();
  }
  //...
}

这样消息消费是并行完成的:

12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8

上面的这个解决方案很简单,购买可能看起来像“黑客”。

另一个想法是在flatMap内部调用Flux.generate

String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
    .flatMap(i ->
        Flux.<String>generate(synchronousSink -> {
          synchronousSink.next(queueManipulator.readMessage(queue));
        }).subscribeOn(Schedulers.parallel()))
    .doOnNext(log::info)
    .subscribe();

【讨论】:

  • 嗯,有趣又有创意。 :) 它解决了这个问题,但我觉得这是一个相当的技巧,而不是一个“正确”的解决方案。
  • 我奖励了你一个可行的解决方案。 ;)
  • 我用另一种可能看起来不那么老套的解决方案更新了答案 :)
【解决方案3】:

你试过了吗:

Flux.generate(vaultQueueConsumer)
 .parallel()
 .runOn(Schedulers.parallel()) 
 .flatMap(vaultDocumentManager::archiveDocument)
 .subscribe();

【讨论】:

  • 不幸的是,我做到了。问题是即使我启用并行处理也很慢,因为queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE) 需要很长时间。
猜你喜欢
  • 1970-01-01
  • 2012-04-08
  • 1970-01-01
  • 2011-05-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多