【问题标题】:How to compose two parallel Tasks to cancel one task if another one fails?如果另一个任务失败,如何组合两个并行任务以取消一个任务?
【发布时间】:2015-10-11 13:30:09
【问题描述】:

我想用 scalaz.concurrent.Task 实现我的异步处理。我需要一个函数(Task[A], Task[B]) => Task[(A, B)] 来返回一个新任务,其工作方式如下:

  • 并行运行Task[A]Task[B],等待结果;
  • 如果其中一项任务失败,则取消第二项任务并等待它终止;
  • 返回两个任务的结果。

你会如何实现这样的功能?

【问题讨论】:

  • 这里cancel 可能意味着几个不同的东西。您只是希望计算快速失败吗?然后Nondeterminism 上的both 之类的东西就会起作用。如果您还想避免浪费循环(或者您想撤消仍在运行的计算的影响),它会更加复杂。
  • 是的,我现在只想让计算快速失败。

标签: scala concurrency task scalaz


【解决方案1】:

正如我上面提到的,如果您不关心实际停止非失败计算,您可以使用Nondeterminism。例如:

import scalaz._, scalaz.Scalaz._, scalaz.concurrent._

def pairFailSlow[A, B](a: Task[A], b: Task[B]): Task[(A, B)] = a.tuple(b)

def pairFailFast[A, B](a: Task[A], b: Task[B]): Task[(A, B)] =
  Nondeterminism[Task].both(a, b)

val divByZero: Task[Int] = Task(1 / 0)
val waitALongTime: Task[String] = Task {
  Thread.sleep(10000)
  println("foo")
  "foo"
}

然后:

pairFailSlow(divByZero, waitALongTime).run // fails immediately
pairFailSlow(waitALongTime, divByZero).run // hangs while sleeping
pairFailFast(divByZero, waitALongTime).run // fails immediately
pairFailFast(waitALongTime, divByZero).run // fails immediately

除第一种情况外,waitALongTime 中的副作用都会发生。如果您想尝试停止该计算,则需要使用类似TaskrunAsyncInterruptibly

【讨论】:

    【解决方案2】:

    Java 开发人员中有一个奇怪的概念,即您不应该取消并行任务。他们使用Thread.stop() 并将其标记为已弃用。如果没有Thread.stop(),您将无法真正取消未来。您所能做的就是向未来发送一些信号,或者修改一些共享变量并在未来内部编写代码以定期检查它。因此,所有提供 future 的库都可以建议取消 future 的唯一方法:合作完成。

    我现在面临同样的问题,正在为可能被取消的期货编写自己的库。有一些困难,但它们可能会得到解决。您只是不能在任意位置调用 Thread.stop() 。线程可以执行更新共享变量。锁会被正常调用,但更新可能会中途停止,例如仅更新 double 值的一半,依此类推。所以我要介绍一些锁。如果线程处于受保护状态,那么它现在应该被 Thread.stop() 杀死,但发送特定消息。守卫状态被认为总是非常快地等待。所有其他时间,在计算过程中,线程可能会被安全地停止并替换为新线程。

    所以,答案是:你不应该想取消期货,否则你就是异端,Java 社区中没有人愿意伸出援手。您应该定义自己的可以杀死线程的执行上下文,并且您应该编写自己的期货库以在此上下文上运行

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-06-28
    • 1970-01-01
    • 2014-08-05
    • 2022-07-11
    • 2013-10-23
    • 2017-06-22
    • 2018-06-01
    相关资源
    最近更新 更多