【问题标题】:How to invoke a method again and again until it returns a `Future` value containing `None`如何一次又一次地调用一个方法,直到它返回一个包含“None”的“Future”值
【发布时间】:2014-10-13 21:40:46
【问题描述】:

给定一个返回 Future 这样的方法...

def remove(id: String): Future[Option[User]] = Future {
  // removes and returns the user identified by `id`
}

...如何一次又一次地调用它,直到它返回一个包含NoneFuture 值?

编辑

也许值得一提的是,我不需要收集结果。只要找到要删除的用户,我只需要调用该方法。这个想法是有一个loopremove 返回Future[None] 时停止。

【问题讨论】:

  • 您要收集结果还是...
  • 不,我不需要收集结果 - 查看我的更新。

标签: scala future


【解决方案1】:

之前有人评论说没有意义。

令我惊讶的是,懒惰地消费期货并没有什么捷径可走。 Future.find 类似于 firstCompletedOf,并不代表 find first in traversable order

scala> import concurrent._, ExecutionContext.Implicits._
import concurrent._
import ExecutionContext.Implicits._

scala> import java.util.concurrent.atomic._
import java.util.concurrent.atomic._

scala> val count = new AtomicInteger(10)
count: java.util.concurrent.atomic.AtomicInteger = 10

scala> def f(s: String) = Future { if (count.decrementAndGet <= 0) None else Some(s) }
f: (s: String)scala.concurrent.Future[Option[String]]

scala> def g(ss: List[String]): Future[List[String]] = f("hello") flatMap { case None => Future.successful(ss) case Some(s) => g(s :: ss) }
g: (ss: List[String])scala.concurrent.Future[List[String]]

scala> g(Nil)
res0: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@65a15628

scala> .value
res1: Option[scala.util.Try[List[String]]] = Some(Success(List(hello, hello, hello, hello, hello, hello, hello, hello, hello)))

说明不阻塞的实用性:

scala> :pa
// Entering paste mode (ctrl-D to finish)

import scala.util._
import concurrent._, ExecutionContext.Implicits._
import java.util.concurrent.atomic._

class Work {
  val count = new AtomicInteger(10)
  def f(s: String) = Future {
    if (count.decrementAndGet <= 0) None else Some(s)
  } andThen {
    case Success(Some(x)) => Console println s"Calculated $x"
    case Success(None)    => Console println "Done."
    case _                => Console println "Failed."
  }
}

// Exiting paste mode, now interpreting.

import scala.util._
import concurrent._
import ExecutionContext.Implicits._
import java.util.concurrent.atomic._
defined class Work

显示Stream 版本,直到消费线程通过阻塞等待时才会计算前缀:

scala> val work = new Work
work: Work = Work@1b45c0e

scala> Stream continually work.f("hello") takeWhile { x => Await.result(x, duration.Duration.Inf).nonEmpty }
Calculated hello
res0: scala.collection.immutable.Stream[scala.concurrent.Future[Option[String]]] = Stream(scala.concurrent.impl.Promise$DefaultPromise@66629f63, ?)

scala> .toList
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Done.
res1: List[scala.concurrent.Future[Option[String]]] = List(scala.concurrent.impl.Promise$DefaultPromise@66629f63, scala.concurrent.impl.Promise$DefaultPromise@610db97e, scala.concurrent.impl.Promise$DefaultPromise@6f0628de, scala.concurrent.impl.Promise$DefaultPromise@3fabf088, scala.concurrent.impl.Promise$DefaultPromise@1e392345, scala.concurrent.impl.Promise$DefaultPromise@12f3afb5, scala.concurrent.impl.Promise$DefaultPromise@4ced35ed, scala.concurrent.impl.Promise$DefaultPromise@2c22a348, scala.concurrent.impl.Promise$DefaultPromise@7bd69e82)

scala> .foreach (Console println _.value.get)
Success(Some(hello))
Success(Some(hello))
[snip]

另一种行为,可能更可取,你会得到一个 Future 来保存计算前缀的结果:

scala> :pa
// Entering paste mode (ctrl-D to finish)

  val work = new Work
  def g(ss: List[String]): Future[List[String]] = work.f("hello") flatMap {
    case None => Future.successful(ss)
    case Some(s) => g(s :: ss)
  }

// Exiting paste mode, now interpreting.

work: Work = Work@796d3c9f
g: (ss: List[String])scala.concurrent.Future[List[String]]

scala> g(Nil)
Calculated hello
Calculated hello
res3: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@99a78d7
Calculated hello
Calculated hello
Calculated hello

scala> Calculated hello
Calculated hello
Calculated hello
Calculated hello
Done.

使用未来:

scala> .value
res5: Option[scala.util.Try[List[String]]] = Some(Success(List(hello, hello, hello, hello, hello, hello, hello, hello, hello)))

【讨论】:

  • 使用 Scalaz,您可以执行类似 StreamT.unfoldM(())(_ =&gt; f("hello").map(_.map(_ -&gt; ()))).toStream 的操作,但是,我认为标准库在这里没有提供任何快捷方式。
【解决方案2】:

Stream#continually 无休止地做同样的事情,Stream#takeWhile 在某个点停止它。 http://www.scala-lang.org/api/2.11.0/index.html#scala.collection.immutable.Stream

Stream.continually(/*remove*/).takeWhile(/*not Future[None]*/)

【讨论】:

  • 像往常一样,投反对票的人没有发表评论。您必须阻止评估。 Stream continually f("hello") takeWhile { x =&gt; Await.result(x, duration.Duration.Inf).nonEmpty }
【解决方案3】:

这里是:

import concurrent._, ExecutionContext.Implicits._
import java.util.concurrent.atomic._

val count = new AtomicInteger(10)

def f(s: String) = Future {
  if (count.decrementAndGet <= 0) None else Some(s)
}

Iterator continually {
  f("hello")
} takeWhile {
  Await.result(_, duration.Duration.Inf).nonEmpty
} foreach { _.map { _.map {
  println
}}

希望对你有帮助。

【讨论】:

  • 这与 Stream 答案相同。我更新以显示它阻塞而另一个不阻塞。现在看看布朗先生所传教的斯卡拉兹。
  • 顺便说一句,如果你想阻止,你还不如坐在一个while循环中。
猜你喜欢
  • 2023-04-06
  • 2014-03-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-09-28
  • 2016-05-26
  • 2022-08-18
  • 2015-07-04
相关资源
最近更新 更多