【问题标题】:Handle twitter4j User Stream 420 Exception处理 twitter4j 用户流 420 异常
【发布时间】:2014-10-08 07:09:56
【问题描述】:

实际问题是这样的:我打开一个用户流来填充我的一些缓存,有时,这个流得到一个 420 异常(在短时间内登录尝试次数过多。)

在尝试重新建立连接之前我应该​​等待多长时间?

   override def onException(ex: Exception): Unit = {
      Logger.info("Exception:::" + ex.getMessage + ":::" + ex.getCause)
      if (ex.getMessage.startsWith("420")) {
        // Can't authenticate for now, thus has to fill up cache hole in next start
        // Wait some time (How long?) Thread.sleep(5000L)
        // Connect via restApi and fill up the holes in the cache
        // Continue listening
      }
    }

【问题讨论】:

    标签: scala twitter streaming twitter4j


    【解决方案1】:

    我想你必须在这里使用一些退避策略,我也不会使用sleep,我会保持我的应用程序异步。

    这可能不是您问题的严格解决方案,因为它几乎是相当多的伪代码,但它可能是一个开始。首先我从 Play 借! the timeout future definition:

    import scala.language.higherKinds
    import scala.concurrent.duration.FiniteDuration
    import java.util.concurrent.TimeUnit
    import scala.concurrent.{ExecutionContext, Future, Promise => SPromise}
    import play.api.libs.concurrent.Akka
    import util.Try
    
    def timeout[A](message: => A, duration: Long, unit: TimeUnit = TimeUnit.MILLISECONDS)(implicit ec: ExecutionContext): Future[A] = {
      val p = SPromise[A]()
      Akka.system.scheduler.scheduleOnce(FiniteDuration(duration, unit)) {
        p.complete(Try(message))
      }
      p.future
    }
    

    这使用 Akka 来安排未来的执行,并结合承诺返回未来。此时,您可以使用 flatMap 在超时未来链接未来执行:

    val timeoutFuture: Future[String] = 
      timeout("timeout", duration, TimeUnit.SECONDS)
    
    timeoutFuture.flatMap(timeoutMessage => connectToStream())
    

    此时连接只有在超时后才执行,但我们仍然需要实现某种重新连接机制,为此我们可以使用recover:

    def twitterStream(duration: Long = 0, retry: Int = 0): Future[Any] = {
      val timeoutFuture: Future[String] = 
        timeout("timeout", duration, TimeUnit.SECONDS)
    
      // check how many time we tried to implement some stop trying strategy
      // check how long is the duration and if too long reset.
    
      timeoutFuture.flatMap(timeoutMessage => connectToStream())
        .recover {
          case connectionLost: SomeConnectionExpiredException => 
            twitterStream(duration + 20, retry + 1) // try to reconnect
          case ex: Exception if ex.getMessage.startsWith("420") =>
            twitterStream(duration + 120, retry + 1) // try to reconect with a longer timer
          case _ => 
            someDefault()
        }
    }
    
    def connectToStream(): Future[String] = {
      // connect to twitter
      // do some computation
      // return some future with some result
      Future("Tweets")
    }
    

    这里发生的情况是,当从未来捕获异常时,如果该异常是 420 或某些连接丢失异常,则执行恢复并在 duration + 20 秒后重新调用函数重新启动连接。

    注意几点,代码未经测试(我只能编译它),这里的退避时间也是线性的(x + y),你可能想看看一些指数backoff strategy,最后你会需要 Akka 来实现一旦在超时未来使用的时间表(Play 已经有 Akka 可用),对于在未来检查 this SO question 使用超时的其他可能性。

    不确定这一切是否矫枉过正,可能有更短更简单的解决方案。

    【讨论】:

      猜你喜欢
      • 2016-07-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-05-15
      相关资源
      最近更新 更多