【问题标题】:Kotlin coroutine future await with timeout (no cancellation)Kotlin 协程未来等待超时(无取消)
【发布时间】:2019-06-19 14:12:04
【问题描述】:

鉴于我们有一个 CompletableFuture f,在 kotlin 可暂停范围内,我们可以调用 f.await(),我们将暂停直到它完成。

我在实现带有签名 f.await(t) 的类似函数时遇到问题,该函数必须暂停最多 t 毫秒,或者如果未来在该持续时间内完成(以先发生者为准),则更快返回。

这是我尝试过的。

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

我还需要一个类似的功能来完成工作。但也许这个解决方案也可以帮助我......

输出是

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

【问题讨论】:

    标签: java kotlin kotlin-coroutines coroutine completable-future


    【解决方案1】:

    我已经写了一些测试代码:

    fun main(args: Array<String>) = runBlocking {
        val future = calculateAsync()
        val result = future.await(2000)
        println("result=$result")
    }
    
    suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
        val future = this
        var result: T? = null
        try {
            withTimeout(duration) {
                result = future.await()
            }
        } catch (t: TimeoutCancellationException) {
            println("timeout exception")
        } catch (e: Throwable) {
            e.printStackTrace()
        }
    
        return result
    }
    
    @Throws(InterruptedException::class)
    fun calculateAsync(): CompletableFuture<String> {
        val completableFuture = CompletableFuture<String>()
    
        Executors.newCachedThreadPool().submit {
            Thread.sleep(3000)
            println("after sleep")
            completableFuture.complete("Completed")
        }
    
        return completableFuture
    }
    

    运行这段代码后,我们会得到一个输出:

    timeout exception
    result=null
    after sleep
    

    我们看到我们的扩展函数 await 返回 null,因为我们将超时设置为 2000 毫秒,但 CompletableFuture 在 3000 毫秒后完成。在这种情况下CompletableFuture 被取消(它的isCancelled 属性返回true),但是我们在calculateAsync 函数中运行的线程继续执行(我们在日志中看到它after sleep)。

    如果我们在 main 函数中将超时持续时间设置为 4000 毫秒 future.await(4000),我们将看到下一个输出:

    after sleep
    result=Completed
    

    现在我们有了一些结果,因为CompletableFuture 的执行速度超过了 4000 毫秒。

    【讨论】:

    • 它正在被取消,可完成的未来与运行您提交的代码的线程无关,在第一次超时后,未来对象已完成...检查自己,或在取消中放置调试断点可完成未来的方法... CF 不会以任何方式中断或终止底层线程...
    • 或者您可以在第一次提前超时后简单地检查 future.isDone() ......这将是真的
    • 我对我的回答做了一些修改。 CompletableFuture 中还有 isCancelled 属性。如果执行超过超时,则返回true,否则返回false
    • 谢尔盖我很欣赏你的努力,但这不是目标,我们希望未来不被触及,我们只是需要一种偶尔检查是否完成的方法(无论是未来还是工作),是的,我们可以在循环中使用延迟,但在这种情况下,如果未来完成,我们仍然会失去延迟时间......请参阅我下面的解决方案,它不会触及未来并且完全符合我们想要的但我不确定如何好的解决方案是性能方面的......
    【解决方案2】:

    这是我想出的,我认为这不是一个好的解决方案,因为我很可能会为相当原始的任务创建大量垃圾。

    
    suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
       val timeout = CompletableFuture<Unit>()
    
       GlobalScope.launch {
          delay(duration)
          timeout.complete(Unit)
       }
    
       val anyOfTwo = CompletableFuture.anyOf(this, timeout)
       anyOfTwo.await()
       return this.isDone
    }
    
    
    fun main() = runBlocking {
       val future = CompletableFuture<String>()
    
       GlobalScope.launch {
          delay(2000)
          println("setting the result (future now ${future.isDone})")
          future.complete("something")
       }
    
       while (future.isNotDone()) {
          println("waiting for the future to complete for the next 500ms")
          val isDone = future.await(500)
    
          if (isDone) {
             println("future is done")
             break
          } else {
    
             println("future not done")
          }
       }
    
       Unit
    }
    

    这将给出

    的输出
    waiting for the future to complete for the next 500ms
    future not done
    waiting for the future to complete for the next 500ms
    future not done
    waiting for the future to complete for the next 500ms
    future not done
    waiting for the future to complete for the next 500ms
    setting the result (future now false)
    future is done
    

    这正是我们想要的……

    【讨论】:

      猜你喜欢
      • 2021-01-04
      • 2020-07-23
      • 2022-01-18
      • 1970-01-01
      • 2020-02-23
      • 1970-01-01
      • 2019-10-28
      • 2020-02-22
      • 2019-02-21
      相关资源
      最近更新 更多