【问题标题】:Source adding items to queue, signalled back when items have been completed源将项目添加到队列,当项目完成时返回信号
【发布时间】:2021-09-08 21:44:54
【问题描述】:

Reactor 中是否有办法从一个管道将项目添加到队列中,让另一个线程/Flux 从该队列中获取项目,并让原始管道等待另一个线程完成该项目的处理?

我需要与此类似的东西,因为可能有许多不同的来源,但我想要/需要一个用于背压、重试等的集中管道。

我现在有一些“有效”的东西,但它涉及大量线程和信号量的使用。不同的管道也会失去彼此之间的上下文,因此未来的事情(如反应式事务)将无法正常工作。

我目前正在使用的“工作”版本是这样的:

  public class Source {

    private final Queue queue = new Queue();

    public Flux<String> produce() {
      return Flux.range(0, 10)
          .doOnNext(v -> System.out.println("Before queue processed: " + v))
          .flatMap(this.queue::enqueue)
          .doOnNext(v -> System.out.println("After queue processed: " + v));
    }
  }

  public class Queue {

    private class WorkEntry {
      int number;
      String word;
      Throwable exception;
      final Semaphore semaphore = new Semaphore(0);
    }

    private final LinkedBlockingQueue<WorkEntry> blockingQueue = new LinkedBlockingQueue<>();

    public Mono<String> enqueue(int number) {

      return Mono.just(number)
          .flatMap(n -> {

            final var entry = new WorkEntry();
            entry.number = number;

            this.blockingQueue.add(entry);
            return Mono.just(entry);
          })
          .subscribeOn(Schedulers.boundedElastic())
          .flatMap(entry -> {

            try {

              // block until drain() has finished processing item
              entry.semaphore.acquire();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }

            // 'word' has been set inside drain() processing
            return Mono.just(entry.word);
          });
    }

    public void drain() {

      Flux
          .<WorkEntry>generate(sink -> {

            final var entry = blockingQueue.poll();
            if (entry == null) {
              sink.complete();
            } else {
              sink.next(entry);
            }
          })
          .flatMap(entry -> Mono.just(entry)
              .flatMap(e -> Mono.just(e.number + "!"))
              .onErrorResume(ex -> {
                entry.exception = ex;
                return Mono.empty();
              })
              .doOnSuccess(word -> {
                entry.word = word;
                entry.semaphore.release(1);
              }))
          .subscribeOn(Schedulers.boundedElastic())
          .subscribe();
    }
  }

  public class Runner {

    public void run() {

      final var source = new Source();

      source.produce()
          .subscribe();

      source.queue.drain();
    }
  }

将打印的内容:

Before queue processed: 0
...
Before queue processed: 9
After queue processed: 0!
...
After queue processed: 9!

当然,这与它在我的代码中的工作方式相去甚远。但我想它的主旨是可以理解的?

我怎样才能让它变得更好?

如何避免在.flatMap 中使用阻塞调用?

如何避免拉起这么多弹性线?

如何始终保持上下文相同?

将其重写为拥有一个从不同来源获取的 Flux?但是,这里的问题是,编写更简单的测试用例会变得更加困难,因为我制作了一个源推送项,并且知道该特定源何时完成。

编辑:我找到了一种更好的处理方法,无需信号量锁。不完美,但更好。这可以通过使用delayUntil 和接收器在源和队列之间进行通信来实现。

  ...

  public class Queue {

    private class WorkEntry {
      int number;
      final Sink.One<String> sink;
    }

    private final LinkedBlockingQueue<WorkEntry> blockingQueue = new LinkedBlockingQueue<>();

    public Mono<String> enqueue(int number) {

      return Mono.just(number)
          .flatMap(n -> {

            final var entry = new WorkEntry();
            entry.number = number;
            entry.sink = Sink.one();

            this.blockingQueue.add(entry);
            return Mono.just(entry);
          })
          .subscribeOn(Schedulers.boundedElastic())
          .delayUntil(e -> e.sink.asMono())
          .flatMap(e -> e.sink.asMono());
    }

    public void drain() {

      Flux
          .<WorkEntry>generate(sink -> {

            final var entry = blockingQueue.poll();
            if (entry == null) {
              sink.complete();
            } else {
              sink.next(entry);
            }
          })
          .flatMap(entry -> Mono.just(entry)
              .flatMap(e -> Mono.just(e.number + "!"))
              .doOnError(t -> entry.sink.tryEmitError(t))
              .doOnSuccess(word -> entry.sink.tryEmitValue(word))
          )
          .subscribeOn(Schedulers.boundedElastic())
          .subscribe();
    }
  }

...

这是可能的,因为 Sink.one().asMono() 在每次调用时都会返回相同的实例。所以我们既可以使用它来延迟,也可以在.flatMap() 中返回结果。

【问题讨论】:

  • 现在我看到了你的实现,我明白你想要什么,我可以直接告诉你,你基本上想要做的就是阻止,直到处理某些事情。反应式编程的全部目的是任何线程都可以从任何线程中断的地方继续,而你想要做的事情基本上违背了这个目的。

标签: java reactive-programming spring-webflux project-reactor reactor


【解决方案1】:

如果您的队列没有异步 api,您可以使用以下 Mono 方法之一:


// when async api returns some kind of readable value 
Mono<Integer> integerMono = Mono.fromCallable(() -> someBlockingService.getNextIntegerValue)
    .doOnError(throwable -> log.error("Error while calling someBlockingService", throwable));

// when async api returns void
Mono<Void> voidMono = Mono.fromRunnable(() -> someBlockingService.doSomething()
    .doOnError(throwable -> log.error("Error while calling someBlockingService", throwable));


还有更多的 Mono.from() 方法可以将 FutureCompletionStage 等其他类型的源封装到 Mono 中。

这样,以异步方式添加到队列非常简单:

        return Mono.just(number)
                .flatMap(n -> {
                    final var entry = new WorkEntry();
                    entry.number = number; // why do you need this line?
                    return Mono.fromCallable(() -> this.blockingQueue.add(entry))
                            .filter(added -> added)
                            .then(Mono.just(entry));
                })
                .subscribeOn(Schedulers.boundedElastic())

如果 add() 返回 false,Mono 将完全为空。但是您可以使用 switchIfEmpty() 返回默认条目。

等待 semaphore.acquire()

这更复杂。

.flatMap(entry -> {

            try {

              // block until drain() has finished processing item
              entry.semaphore.acquire();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }

            // 'word' has been set inside drain() processing
            return Mono.just(entry.word);
          });

flatMap() 内的代码应在 *“合理的时间内”*™️ 内响应。 Spring Webflux 有超时,如果获取信号量的时间超过超时允许的时间,则会发出错误。

你可以尝试使用 zipWith() 和 retry()

       return Mono.just(number)
                .flatMap(n -> {
                    final var entry = new WorkEntry();
                    entry.number = number; // why do you need this line?
                    return Mono.fromCallable(() -> this.blockingQueue.add(entry))
                            .filter(added -> added) // continue only if entry was added to queue
                            .zipWith(Mono.fromCallable(() -> {
                                try {
                                    entry.semaphore.acquire();
                                    return entry.word;
                                } catch (InterruptedException e) {
                                    log.error("error while waiting for semaphore", e);
                                    Thread.currentThread().interrupt();
                                }
                                throw new IllegalStateException("Could not acquire a semaphore in time");
                            }));
                })
                .subscribeOn(Schedulers.boundedElastic())
                .retry(10)
                .map(hasSemaphoreAndEntry -> {
                    Boolean hasSemaphore = hasSemaphoreAndEntry.getT1(); // always true
                    String word = hasSemaphoreAndEntry.getT2();
                    return word;
                })

                .doOnNext( word-> log.info("received {}", word)

您的其他问题

我怎样才能让它变得更好?

  • 如果可能,请尝试摆脱信号量。一些事件驱动的实现与 Webflux 一起工作得更好。

如何避免在 .flatMap 中使用阻塞调用?

  • 如上所示,尝试Mono.from() 方法之一。

如何避免缠绕这么多弹性线?

  • WebFlux 总是有等待的问题。转向事件驱动的实现对于响应式 WebFlux 方法来说更为自然。但我认为,这将是整个应用程序的架构更改。

如何始终保持上下文相同?

  • 我不确定我是否正确理解了这个问题。如果条目是保存上下文的对象,请始终将其作为连续 map()、zipWith() 和 flatMap() 调用的返回值。

【讨论】:

  • 上下文是指 Reactor 上下文本身。例如,如果我想要一个测试用例为@Transactional,并使用反应式事务......如果我在测试用例中的 Flux 与实际处理分离,那么它不会在测试用例完成后回滚,因为它将在另一个上下文/父事务中。我会试着看看你提到的。但我不确定我是否理解事件驱动的实现需要什么。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-04-11
  • 1970-01-01
  • 2015-08-23
  • 2021-08-25
  • 2017-01-09
  • 2023-01-16
  • 2012-04-22
相关资源
最近更新 更多