【问题标题】:How can I create reactor Flux from a blocking queue?如何从阻塞队列中创建反应器通量?
【发布时间】:2019-04-08 05:43:48
【问题描述】:

我正在尝试实现从 BlockingQueue 创建的反应器 Flux,但不确定哪个运算符最适合我的用例?

我正在创建一个流式 REST 端点,其中响应是 Flux,它需要不断从 BlockingQueue 发出消息作为对 GET REST 调用的响应。

我已经尝试过论坛和文档,只能找到从可迭代集合或反应数据源发起的 Flux,但没有任何 BlockingQueue 的示例。

【问题讨论】:

  • 添加更多描述或您的代码 sn-p 以便其他人能够理解您的问题并帮助您。

标签: java spring-webflux project-reactor


【解决方案1】:

你可以试试Flux#generateQueue#peek。请记住,如果队列为空,peek 将返回null,并且不能在onNext 中使用。

类似:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

还有Flux#repeatWhen 运算符,以防您想在队列被视为空后重新订阅,例如与:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))

【讨论】:

  • 成功了。虽然,一旦服务器发出接收器完成的信号,我的客户端(浏览器)就会一次又一次地发送请求。所以我改为BlockingQueue#take。不确定这如何适合反应式异步处理。
  • 小心,因为#take 正在阻塞。确保您订阅了对阻塞友好的调度程序(例如Schedulers.elastic()
  • @bsideup,为什么这里使用peek() 而不是poll() 来从队列中删除元素,一旦它被流水线化了?
猜你喜欢
  • 1970-01-01
  • 2010-10-06
  • 2014-10-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-01
  • 1970-01-01
相关资源
最近更新 更多