【问题标题】:Mono/Flux: how to suspend thread and wait for event or timeoutMono/Flux:如何暂停线程并等待事件或超时
【发布时间】:2019-08-08 13:29:31
【问题描述】:

我想请教如何解决 Mono/Flux 通过延迟或超时暂停和重新激活的问题。

任务是:应用程序将接收 HTTP 请求并应提供响应。

当使用异步套接字接收到请求时,应发送消息。而且我们需要等待具体的答复,但不能超过 30 秒。

所以我需要暂停线程,直到调用某个 Runnable,或者其他选项是每 0.2 秒查询某个变量,如果它设置为相关值,则继续该过程。

你能给我一些建议吗?

谢谢

【问题讨论】:

    标签: java kotlin project-reactor


    【解决方案1】:

    我认为您需要使用 Mono/Flux 中的超时方法来设置该行为。示例:

    yourMonoOrFlux.timeout(Duration.ofSeconds(30))
                  .onErrorResume(yourFallbackMethod)
                  ... //some other chained operations
    

    当出现问题时,也可以使用 onErrorResume 方法设置回退方法。

    但是如果你真的需要在这 30 秒内阻塞线程,你应该使用阻塞方法而不是超时。示例:

    yourMonoOrFlux.block(Duration.ofSeconds(30))
                  ... //other chained operations 
    

    参考官方reactor documentation

    【讨论】:

      【解决方案2】:

      我终于找到了解决办法。

      也许不是最优雅的,但可以使用递归

      此代码查询变量状态以获得肯定响应,但不超过 10 秒超时。

      val delayDuration = Duration.ofMillis(200)
      val maximumAttempts = 50
      
      fun createDelayedMono(counter : Int) : Mono<BigInteger> {
      
          val mono = Mono.delay(delayDuration).flatMap {
              it ->
      
              if (counter < maximumAttempts && reactorHelper.isEventCompleted(rrn)) {
                  reactorHelper.removeEvent(rrn)
                  return@flatMap Mono.just(BigInteger.ZERO)
              } else {
                  return@flatMap createDelayedMono(counter + 1)
              }
      
          }
      
          return mono
      }
      

      【讨论】:

        猜你喜欢
        • 2018-07-16
        • 1970-01-01
        • 2013-03-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-23
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多