我想你必须在这里使用一些退避策略,我也不会使用sleep,我会保持我的应用程序异步。
这可能不是您问题的严格解决方案,因为它几乎是相当多的伪代码,但它可能是一个开始。首先我从 Play 借! the timeout future definition:
import scala.language.higherKinds
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise => SPromise}
import play.api.libs.concurrent.Akka
import util.Try
def timeout[A](message: => A, duration: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)(implicit ec: ExecutionContext): Future[A] = {
val p = SPromise[A]()
Akka.system.scheduler.scheduleOnce(FiniteDuration(duration, unit)) {
p.complete(Try(message))
}
p.future
}
这使用 Akka 来安排未来的执行,并结合承诺返回未来。此时,您可以使用 flatMap 在超时未来链接未来执行:
val timeoutFuture: Future[String] =
timeout("timeout", duration, TimeUnit.SECONDS)
timeoutFuture.flatMap(timeoutMessage => connectToStream())
此时连接只有在超时后才执行,但我们仍然需要实现某种重新连接机制,为此我们可以使用recover:
def twitterStream(duration: Long = 0, retry: Int = 0): Future[Any] = {
val timeoutFuture: Future[String] =
timeout("timeout", duration, TimeUnit.SECONDS)
// check how many time we tried to implement some stop trying strategy
// check how long is the duration and if too long reset.
timeoutFuture.flatMap(timeoutMessage => connectToStream())
.recover {
case connectionLost: SomeConnectionExpiredException =>
twitterStream(duration + 20, retry + 1) // try to reconnect
case ex: Exception if ex.getMessage.startsWith("420") =>
twitterStream(duration + 120, retry + 1) // try to reconect with a longer timer
case _ =>
someDefault()
}
}
def connectToStream(): Future[String] = {
// connect to twitter
// do some computation
// return some future with some result
Future("Tweets")
}
这里发生的情况是,当从未来捕获异常时,如果该异常是 420 或某些连接丢失异常,则执行恢复并在 duration + 20 秒后重新调用函数重新启动连接。
注意几点,代码未经测试(我只能编译它),这里的退避时间也是线性的(x + y),你可能想看看一些指数backoff strategy,最后你会需要 Akka 来实现一旦在超时未来使用的时间表(Play 已经有 Akka 可用),对于在未来检查 this SO question 使用超时的其他可能性。
不确定这一切是否矫枉过正,可能有更短更简单的解决方案。