【问题标题】:Flux to List<Objects> without blocking流到 List<Objects> 而不阻塞
【发布时间】:2020-09-18 11:28:44
【问题描述】:

正在寻找将 Flux 转换为 List&lt;Object&gt;。如果我使用 block() 会出错。因此,需要在不阻塞呼叫的情况下进行交谈。

Flux.from(Collection.find())

使用响应式编程,但 graphql 期望 List&lt;objects&gt; 并返回 Flux 时出错。

带有 Block() 的代码

public List<Test> findAll() {
        return Flux.from(testCollection.find()).collectList().block();

}

错误:-

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-kqueue-7

在这里,我需要返回List&lt;Test&gt;,因为由于某种原因我无法发送Flux&lt;Test&gt;

【问题讨论】:

  • 你不能。这需要更多细节 - 绝对没有办法以非阻塞方式将 Flux 转换为 Collection。有解决这个问题的方法 - 您可以切换通量以使用允许阻塞的单独线程,您可以在订阅者链中调用您的 graphql 库等 - 但很难/不可能用@987654321 判断哪种方法最好@.
  • 如果上下文是关于 Spring Webflux 的,你真的不需要,因为框架隐式订阅了flux/mono。
  • 发帖提示: (a) 请对您的作品进行拼写检查; (b) 人称代词“I”请始终使用大写字母; (c) 请不要添加诸如问候和感谢之类的对话材料; (d) 始终使用预览窗格,因此您可以查看您的材质是否已正确渲染。泛型表达式需要在这里修复。

标签: java reactive-programming project-reactor graphql-java


【解决方案1】:

以下示例是将flux&lt;Object&gt; 转换为List&lt;Object&gt;,但请记住,将 Flux 转换为 List 会使整个事情不具反应性:

public static void main(String[] args) {
    Flux<String> flux = Flux.just("test1", "test2", "test3");
    List<String> list = new ArrayList<>();
    flux.collectList().subscribe(list::addAll);
    list.forEach(System.out::println);
}

【讨论】:

    【解决方案2】:

    我假设Collection 类来自某个反应器库,正在执行一些 tcp/http 调用? collection.find 返回 Flux/Mono/Publisher 我假设?那不是因为collectList 不允许你这样做,那是因为你试图在一个NonBlocking 线程上运行block,我假设collection.find 在一个线程上发布元素是NonBlocking 的一个实例,来自它的名字reactor-http-kqueue-7,这可能是一个网络线程。

    您可以查看BlockingSingleSubscriber.blockingGet 方法,它会告诉您原因

        final T blockingGet() {
            if (Schedulers.isInNonBlockingThread()) {
                throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
            }
    ...
    

    如果你必须从调用者线程(调用 Flux.from)获取结果,那么你可以这样做

    Flux.from(testCondition.find())
    .publishOn(Schedulers.boundedElastic())
    .collectList()
    .block()
    

    【讨论】:

      【解决方案3】:

      如 cmets 所述,您不能。反应模式是停留在一个流程中。

      所以,

      Mono<GraphqlResponse> = Flux.just("A", "B" "C")
        .collectList()
        .map(this::someMethod);
      
      GraphqlResponse someMethod(List<String> abcs) {
          return graphQl.doSomething(abcs);
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-28
        • 2012-08-16
        • 1970-01-01
        • 1970-01-01
        • 2021-08-04
        相关资源
        最近更新 更多