【问题标题】:Scala: Cool way to manage sequential execution of Futures?Scala:管理期货顺序执行的酷方法?
【发布时间】:2014-03-21 08:52:14
【问题描述】:

我正在尝试用 Scala 编写一个数据模块。

在并行加载整个数据时,一些数据依赖于其他数据,因此必须以有效的方式管理执行顺序。

例如在代码中,我保留了一张带有数据名称和清单的地图

val dataManifestMap = Map(
  "foo" -> manifest[String],
  "bar" -> manifest[Int],
  "baz" -> manifest[Int],
  "foobar" -> manifest[Set[String]], // need to be executed after "foo" and "bar" is ready
  "foobarbaz" -> manifest[String], // need to be executed after "foobar" and "baz" is ready
)

这些数据将存储在可变哈希映射中

private var dataStorage = new mutable.HashMap[String, Future[Any]]()

有些代码会加载数据

def loadAllData(): Future[Unit] = {
  Future.join(
    (dataManifestMap map {
      case (data, m) => loadData(data, m) } // function has all the string matching and loading stuff
    ).toSeq
  )    
}

def loadData[T](data: String, m: Manifest[T]): Future[Unit] = {
  val d = data match {
    case "foo" => Future.value("foo")
    case "bar" => Future.value(3)
    case "foobar" => // do something with dataStorage("foo") and dataStorage("bar")
    ... // and so forth (in a real example it would be much more complicated for sure)
  }

  d flatMap { 
    dVal => { this.synchronized { dataStorage(data) = dVal }; Future.value(Unit) }
  }
}

这样,当“foo”和“bar”准备好时,我无法确保“foobar”已加载,等等。

由于我可能有数百个不同的数据,我该如何以“酷”的方式管理它?

如果我可以拥有某种数据结构,其中包含必须在某事之后加载某事的所有信息,并且顺序执行可以由 flatMap 以一种简洁的方式处理,那将是“很棒的”。

提前感谢您的帮助。

【问题讨论】:

  • 可能在这里使用 Actors 会更有意义?
  • 对不起,糟糕的 sudo 代码。此外,我需要确保在尝试加载“foobar”时“foo”和“bar”已准备就绪。所以解决方案不一定需要与Futures的顺序执行相关。它可以正确地使用 Promise,或者任何可以达到目的的东西。谢谢。
  • 这正是有状态的参与者可以派上用场的地方。
  • @Ashalynd 谢谢你的建议!我仍然不确定我应该使用什么设计:)

标签: scala future


【解决方案1】:

在所有条件相同的情况下,我倾向于使用for 理解。例如:

def findBucket: Future[Bucket[Empty]] = ???
def fillBucket(bucket: Bucket[Empty]): Future[Bucket[Water]] = ???
def extinguishOvenFire(waterBucket: Bucket[Water]): Future[Oven] = ???
def makeBread(oven: Oven): Future[Bread] = ???
def makeSoup(oven: Oven): Future[Soup] = ???
def eatSoup(soup: Soup, bread: Bread): Unit = ???


def doLunch = {
  for (bucket <- findBucket;
       filledBucket <- fillBucket(bucket);
       oven <- extinguishOvenFire(filledBucket);
       soupFuture = makeSoup(oven);
       breadFuture = makeBread(oven);
       soup <- soupFuture;
       bread <- breadFuture) {
    eatSoup(soup, bread)
  }
}

这将期货链接在一起,并在满足依赖关系后调用相关方法。请注意,我们在for 理解中使用= 以允许我们同时启动两个Futures。就目前而言,doLunch 返回Unit,但如果您将最后几行替换为:

// ..snip..
       bread <- breadFuture) yield {
    eatSoup(soup, bread)
    oven
  }
}

然后它将返回 Future[Oven] - 如果您想在午餐后使用烤箱做其他事情,这可能会很有用。

至于您的代码,我的第一个想法是您应该考虑Spray cache,因为它看起来可能符合您的要求。如果没有,我的下一个想法是替换您当前拥有的Stringly typed 接口,并使用基于类型化方法调用的东西:

private def save[T](key: String)(value: Future[T]) = this.synchronized {
  dataStorage(key) = value
  value
}

def loadFoo = save("foo"){Future("foo")}
def loadBar = save("bar"){Future(3)}
def loadFooBar = save("foobar"){
  for (foo <- loadFoo;
       bar <- loadBar) yield foo + bar // Or whatever
}
def loadBaz = save("baz"){Future(200L)}
def loadAll = {
  val topLevelFutures = Seq(loadFooBar, loadBaz)
  // Use standard library function to combine futures
  Future.fold(topLevelFutures)(())((u,f) => ())
}

// I don't consider this method necessary, but if you've got a legacy API to support...
def loadData[T](key: String)(implicit manifest: Manifest[T]) = {
  val future = key match {
      case "foo" => loadFoo
      case "bar" => loadBar
      case "foobar" => loadFooBar
      case "baz" => loadBaz
      case "all" => loadAll
  }
  future.mapTo[T]
}

【讨论】:

  • 感谢您的回答。我仍然不确定我应该实现您的设计还是使用 Actors...
  • 也许你可以告诉我们更多关于你的问题的细节。例如,任务序列是否需要在运行时修改(可能由非技术用户),或者如果它在编译时固定就可以了吗?可以通过检查数据以某种方式确定正确的顺序吗?我们实际上在谈论什么样的数据,以及使用什么样的代码来加载它?这些东西可能暗示了一种特定的设计。
  • 1.不,它不必是可修改的。顺序将被固定。虽然,除了在开始时加载整个数据之外,“foobar”可能会在运行期间单独重新加载(数据依赖于其他数据),因此在这种情况下,必须预先加载“foo”和“bar”请求加载“foobar”时。 2. 确定顺序时检查数据无济于事 3. 数据将由 Int、Map 等组成。数据将从 redis、MySQL 和 msgpack 缓存的数据中加载。目前所有代码都是用 Scala 编写的。
  • (详述1) loadData[T] 可以单独调用,不仅在loadAllData中。因此,当调用 loadData("foobar") 并且未加载 "foo" 和 "bar" 时,我们应该在加载 "foobar" 之前加载 "foo" 和 "bar"!希望这会有所帮助!
  • 这听起来有点像缓存。这公平吗?你会考虑使用像 spray-cache (spray.io/documentation/1.1-SNAPSHOT/spray-caching) 这样的东西吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-04
  • 2018-06-13
  • 2016-07-11
  • 2018-08-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多