【问题标题】:How to invoke a list of callbacks that return a Future one after another如何调用一个又一个返回 Future 的回调列表
【发布时间】:2015-09-13 19:55:29
【问题描述】:

我需要一种机制来异步调用多个回调...所以我实现了以下类:

class AsyncCallbacks[T] {

  private val callbacks = new ListBuffer[T => Future[Unit]]()

  def +=(f: T => Future[Unit]) = callbacks += f
  def -=(f: T => Future[Unit]) = callbacks -= f

  def invoke(data: T) = Future.sequence(callbacks.map(_(data)))
}

...

def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }

val callbacks = new AsyncCallbacks[Int]
callbacks += f1
callbacks += f2
callbacks.invoke(5)

callbacks.invoke 产生一个scala.concurrent.Future[scala.collection.mutable.ListBuffer[Unit]]... 我想知道是否有更好、更有效的方法来调用所有注册的回调而不生成一个无用的Units 列表。

上面的实现还有一个问题...假设我们有以下方法...

def l1 = Future { List.fill(5)("1") }
def l2 = Future { List.fill(5)("2") }

...然后我像这样调用它们:

for {
  a <- l1
  b <- l2
  c <- callbacks.invoke(5)
} yield b

callbacks.invoke 有效...但它看起来永远不会返回...

编辑

好的,我尝试按照I.K. 的建议使用scalaz 重新实现我的AsyncCallbacks 类:

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.concurrent.Task

class AsyncCallbacks[T] {

  private val tasks = new ListBuffer[Task[T => Future[Unit]]]()

  /** Gets the number of callbacks registered. */
  def count = tasks.length

  /** Clears all the registered callbacks. */
  def clear = tasks.clear

  /* Adds the specified function to the list of callbacks to be invoked. */
  def +=(f: T => Future[Unit]) = tasks += Task(f)

  /** Invokes all the registered callbacks. */
  def invoke(data: T) = Future { Task.gatherUnordered(tasks).map(_.map(_(data))).run.length }
}

这是它的用法:

def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }

val callbacks = new AsyncCallbacks[Int]()
callbacks += f1
callbacks += f2
callbacks.invoke(4) // prints 4 two times (f1 + f2)

现在只需从 REPL 执行上面的代码...然后尝试多次调用 `callbacks.invoke(4) ,您将看到您不再能够退出 REPL(它仍然被阻止并且您必须使用 CTRL-C 退出)。我认为这在实际应用中可能是个问题。

【问题讨论】:

  • 你愿意使用Scalaz吗?
  • 哇...是的,但我不知道...
  • 如果您不关心回调函数是否成功完成(不生成无用的单位列表),您可以执行callbacks.foreach(_(data)),这将为您提供@987654334 @ 作为结果类型。

标签: scala future


【解决方案1】:

从您的帖子看来,无论您要放入 Future 正文的数据类型如何,您都希望它完成并收到通知。

在 Scalaz 中,它将被建模为 Task,它本质上是下面的 Future,但带有附加功能。

一些例子,

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task

scala> val tasks = (1 |->  5).map(n => Task { Thread.sleep(100); n })
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@72b64eae, scalaz.concurrent.Task@3f6a6af, scalaz.concurrent.Task@5ba0314c, scalaz.concurrent.Task@36718c9f, scalaz.concurrent.Task@767277c1)

scala> Task.gatherUnordered(tasks).run
res10: List[Int] = List(4, 1, 2, 3, 5)

scala> Task.gatherUnordered(tasks).run
res11: List[Int] = List(3, 1, 2, 4, 5)

scala> Task.gatherUnordered(tasks).run
res12: List[Int] = List(2, 1, 3, 4, 5)

如您所见,每次运行这些任务时,输出都是不同的。 Task 实现是不确定的。

以你为例,

scala> val tasks = List(Task{1},Task{2})
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@2858b10a, scalaz.concurrent.Task@3782f5d8)

scala> Task.gatherUnordered(tasks).run
res13: List[Int] = List(1, 2)

scala> val tasks = List(Task{List.fill(5)("1")}, Task{List.fill(5)("2")})
tasks: List[scalaz.concurrent.Task[List[String]]] = List(scalaz.concurrent.Task@1c8dd945, scalaz.concurrent.Task@71f8e5ff)

scala> Task.gatherUnordered(tasks).run
res17: List[List[String]] = List(List(1, 1, 1, 1, 1), List(2, 2, 2, 2, 2))

【讨论】:

  • 非常感谢 :-) 刚刚尝试过...但请参阅我更新的帖子。我已经使用 scalaz 重新实现了我的 AsyncCallback 类,但与旧实现一样,REPL 在几次尝试后仍然被阻止。
  • 你为什么还需要你的AsyncCallback? Scalaz 中也有一个runAsync 方法。
  • 在我的真实应用程序中,我有一组函数T =&gt; Future[Unit] 可以在这样的理解中调用:for { transactions &lt;- Future.sequence(orders.map { order =&gt; for { transaction &lt;- createTransaction(order.id) _ &lt;- myCallbacks(transaction) } yield transaction }) }
  • 使用runAsync 方法并在其主体中传递一个函数,以便在每个任务完成时调用。或者,如果您对 Scalaz 不太熟悉,您可以使用 Async scala 库。
猜你喜欢
  • 2021-03-13
  • 1970-01-01
  • 1970-01-01
  • 2020-07-25
  • 2021-09-23
  • 2019-11-18
  • 2019-05-18
  • 2016-08-01
  • 1970-01-01
相关资源
最近更新 更多