【问题标题】:Kotlin coroutines - async withTimeout which stops blocking the thread once it runs out of timeKotlin coroutines - async withTimeout 一旦线程超时就停止阻塞
【发布时间】:2021-05-17 08:38:06
【问题描述】:

我正在尝试创建一个功能,该功能会触发可能无法取消的缓慢操作。我希望此操作在超时的协程中运行。由于操作不能如前所述取消,我需要函数在超时后返回,但操作留在后台。

我一直在尝试运行的代码异步运行 10 秒的冗长操作,其超时时间为 5 秒,因此该函数应在超时后返回并让 main 继续其工作,打印“foo execution finished ”,最后 5 秒后,慢速作业将打印“作业结束(经过 10 秒)”。

代码如下:

fun main() {
    println("program execution begins")
    foo()
    println("foo execution finished")
    while(true);
}

fun foo() = runBlocking {
    val job = async {
        val endTimeMillis = System.currentTimeMillis() + (10 * 1000)

        while (System.currentTimeMillis() <= endTimeMillis); //blocks for 10 seconds
        println("job ends (10 seconds passed)")
    }

    try {
        withTimeout(5000) {
            println("start awaiting with 5 secs timeout")
            job.await()
        }
    } catch (ex: TimeoutCancellationException) {
        println("out of time")
    }
}

然后产生以下结果:

program execution begins
start awaiting with 5 secs timeout
job ends (10 seconds passed)
out of time
foo execution finished

但这并不是我在前面提到的这种情况下需要的行为。我需要使输出看起来像:

program execution begins
start awaiting with 5 secs timeout
out of time
foo execution finished
job ends (10 seconds passed)

除此之外,我不能在异步中使用任何类型的“kotlin-coroutines”函数来归档这种行为(好吧,配合取消),因为那里调用的代码将与用户代码无关协程,可能是用 Java 编写的。因此用于阻塞异步块的 while 循环而不是示例中的 delay()。

提前感谢您的帮助!

【问题讨论】:

  • @Alex.T 它不会直接“返回某些东西”,而是通过我制作的单独机制将数据发送到我程序的另一部分。但是我仍然需要等待该数据发送才能继续,或者如果花费的时间太长则超时并继续前进,这样我的整个程序就不会冻结。
  • 对不起,误删了评论。对于任何想知道的人,我在问async 块是否有预期的实际返回值。

标签: kotlin asynchronous


【解决方案1】:

如果您不能中断阻塞代码,那么您需要在不同的线程中运行它。否则你的线程将没有机会处理超时。

您还需要确保包含阻止代码的Job 不是您等待的Job 的子代。否则超时会取消阻塞Job,但它仍会旋转10秒,runBlocking会等待它完成。

完成这两件事的最简单方法是使用GlobalScope,如下所示:

fun foo()  = runBlocking {
    try {
        withTimeout(5000) {
            println("start awaiting with 5 secs timeout")
            GlobalScope.async {
                val endTimeMillis = System.currentTimeMillis() + (10 * 1000)

                while (System.currentTimeMillis() <= endTimeMillis); //blocks for 10 seconds
                println("job ends (10 seconds passed)")
            }.await()
        }
    } catch (ex: TimeoutCancellationException) {
        println("out of time")
    }
}

当然,即使您停止等待,该线程仍会旋转 10 秒...这太糟糕了,所以我希望您真的有充分的理由想要这个。

【讨论】:

  • 谢谢!是的,不幸的是,我真的没有其他选择可以这样做......这完全取决于一些用户代码。旋转的 while 循环只是为了举例说明问题,但我们永远无法确定那里会发生什么......
  • 用户可以编写无限循环。如果它是用户代码,那么您应该真正在自己的进程中运行它。您可以可靠且安全地终止进程。
  • 在我的情况下,多处理也不是一个选项......但是,如果可能的话,应该使用用户代码。
  • @serivesmejia 实际上你想在Dispatchers.IO 中以GlobalScope.launch(Dispatchers.IO) {} 启动一个阻塞操作,因为这可以使用来自CommonPool 的少量线程,而Dispatchers.Default 中使用的线程在GlobalScope 中的数量有限它可以从 CommonPool 借用的线程数。
【解决方案2】:

这样的事情应该可以工作:

fun main() {
    println("start")
    foo()
    println("foo finished")
    while (true);
}

fun foo() {
    val start = System.currentTimeMillis()
    GlobalScope.launch {
        val endTimeMillis = System.currentTimeMillis() + (10 * 1000)

        while (System.currentTimeMillis() <= endTimeMillis); //blocks for 10 seconds
        println("${start.secondsSince()} job ends (10 seconds passed)")
    }
    println("${start.secondsSince()}  waiting")
    runBlocking {
        delay(5000)
    }
    println("${start.secondsSince()}  finished waiting")
}

fun Long.secondsSince() = (System.currentTimeMillis() - this) / 1000.00

这将输出:

start
0.04  waiting
5.049  finished waiting
foo finished
10.043 job ends (10 seconds passed)

请忽略计算秒数的可怕方式,现在是凌晨 2 点,但它确实证明了这一点。

现在来看看为什么会这样。首先,我建议阅读此SO question on the difference between async and launch。 TLDR launch 用于一劳永逸,因为你对任何结果都不感兴趣(不需要返回值),所以没有必要使用async。其次是delay 上的文档。

你的不工作的原因是(据我所知)

  1. 您在job.await 上使用了withTimeout ,这与在协程本身上运行它并不完全相同。
  2. foo fun 被阻塞了,这意味着它会一直等待从其中启动的所有协程,然后再继续。这意味着您永远不会在job ends (10 seconds passed) 之前获得foo execution finished

顺便说一句,实际的最终代码只是:

fun foo() {
    GlobalScope.launch {
       //code that takes a long time
    }
    
    runBlocking { delay(5000) }
    
    //other code that will continue after 5 seconds
}

【讨论】:

  • 哟,谢谢你的解释!对于这种特定情况,这是一个很好的解决方案,尽管这在我需要的情况下不能很好地工作。您的解决方案总是等待“超时”(5)秒,但是当“需要很长时间的代码”时,我需要该函数返回完成它的工作,或者它超时(并且“需要很长时间的代码”在超时的情况下继续在后台运行)。因此,对于我的用例来说,总是阻塞 5 秒并不是最佳选择。可能对我没有具体说明那部分是不好的。我真的很感谢你的答案!
【解决方案3】:

虽然我已经标记了一个接受的答案,但值得注意的是我的解决方案最终有点不同。 [供后代阅读:),你好未来!]

@Alex.T 指出:

foo fun 被阻塞了,这意味着它会一直等待所有从它内部启动的协程,然后再继续。这意味着您永远不会在作业结束(经过 10 秒)之前完成 foo 执行。

考虑到这一点,我提出了以下解决方案:

fun main() {
    println("program execution begins")
    foo()
    println("foo execution finished")
    while(true);
}

fun foo() {
    val job = GlobalScope.launch {
        val endTimeMillis = System.currentTimeMillis() + 10000

        while (System.currentTimeMillis() <= endTimeMillis);
        println("job ends (10 seconds passed)")
    }

    runBlocking {
        try {
            withTimeout(5000) {
                println("start awaiting with 5 secs timeout")
                job.join()
            }
        } catch (ex: TimeoutCancellationException) {
            println("out of time")
            job.cancel()
        }
    }
}

它不会用 runBlocking 包装两个协程,只有 withTimeout。所以我们实际的阻塞操作在全局范围内单独运行,因此,它会产生以下输出

program execution begins
start awaiting with 5 secs timeout
out of time
foo execution finished
job ends (10 seconds passed)

(问题中我想要的实际输出,哇!)

这里还值得指出@Matt Timmermans 所说的话:

当然,即使您停止等待,该线程也会旋转 10 秒...这太糟糕了,所以我希望您真的有充分的理由想要这个。

所以请谨慎使用。

【讨论】:

    猜你喜欢
    • 2018-12-29
    • 1970-01-01
    • 1970-01-01
    • 2019-04-10
    • 1970-01-01
    • 1970-01-01
    • 2019-12-26
    • 1970-01-01
    • 2019-10-11
    相关资源
    最近更新 更多