【发布时间】:2015-09-13 19:55:29
【问题描述】:
我需要一种机制来异步调用多个回调...所以我实现了以下类:
class AsyncCallbacks[T] {
private val callbacks = new ListBuffer[T => Future[Unit]]()
def +=(f: T => Future[Unit]) = callbacks += f
def -=(f: T => Future[Unit]) = callbacks -= f
def invoke(data: T) = Future.sequence(callbacks.map(_(data)))
}
...
def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]
callbacks += f1
callbacks += f2
callbacks.invoke(5)
callbacks.invoke 产生一个scala.concurrent.Future[scala.collection.mutable.ListBuffer[Unit]]... 我想知道是否有更好、更有效的方法来调用所有注册的回调而不生成一个无用的Units 列表。
上面的实现还有一个问题...假设我们有以下方法...
def l1 = Future { List.fill(5)("1") }
def l2 = Future { List.fill(5)("2") }
...然后我像这样调用它们:
for {
a <- l1
b <- l2
c <- callbacks.invoke(5)
} yield b
callbacks.invoke 有效...但它看起来永远不会返回...
编辑
好的,我尝试按照I.K. 的建议使用scalaz 重新实现我的AsyncCallbacks 类:
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.concurrent.Task
class AsyncCallbacks[T] {
private val tasks = new ListBuffer[Task[T => Future[Unit]]]()
/** Gets the number of callbacks registered. */
def count = tasks.length
/** Clears all the registered callbacks. */
def clear = tasks.clear
/* Adds the specified function to the list of callbacks to be invoked. */
def +=(f: T => Future[Unit]) = tasks += Task(f)
/** Invokes all the registered callbacks. */
def invoke(data: T) = Future { Task.gatherUnordered(tasks).map(_.map(_(data))).run.length }
}
这是它的用法:
def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]()
callbacks += f1
callbacks += f2
callbacks.invoke(4) // prints 4 two times (f1 + f2)
现在只需从 REPL 执行上面的代码...然后尝试多次调用 `callbacks.invoke(4) ,您将看到您不再能够退出 REPL(它仍然被阻止并且您必须使用 CTRL-C 退出)。我认为这在实际应用中可能是个问题。
【问题讨论】:
-
你愿意使用
Scalaz吗? -
哇...是的,但我不知道...
-
如果您不关心回调函数是否成功完成(不生成无用的单位列表),您可以执行
callbacks.foreach(_(data)),这将为您提供@987654334 @ 作为结果类型。