【问题标题】:Akka Streams RestartSource.onFailuresWithBackoff stop conditionAkka Streams RestartSource.onFailuresWithBackoff 停止条件
【发布时间】:2023-04-03 13:44:01
【问题描述】:

如果出现异常,我正在使用 RestartSource.onFailuresWithBackoff 重新启动源,但如果收到某种异常类型,我想停止(取消)重新启动。例如:

RestartSource
  .onFailuresWithBackoff(
    minBackoff = 1,
    maxBackoff = 5,
    randomFactor = 0.2,
    maxRestarts = 3
  ) { () => 
    val responseFuture = doSomeAsyncTask().recover {
      case SomeSpecialError =>
        // I want to quit from the restarts
      case NonFatal(ex) => 
        // Re-throw so that the Source is restarted
        throw ex
    }

    Source
      .future(responseFuture)
      .mapAsync(parallelism = 1)(Future.successful(_))
}

我尝试在包装的 Source 和 RestartSource 上设置监督策略,但事件从未到达它。出于这个原因,同样的解释也适用于尝试在 Sink 运算符上执行此操作。

【问题讨论】:

    标签: scala akka-stream


    【解决方案1】:

    根据RestartSource.onFailuresWithBackoff 文档,您只需完成源(不发出任何内容)以防止重新启动。

    实现此目的的一种方法是,如果doSomeAsyncTask 产生Future[T],则将其映射为Future[Option[T]],然后将可区分的失败恢复为成功的None。然后在流源中:

    • 如果最初的未来因其他异常而失败,则源将失败并重新启动
    • 如果最初的未来因明显异常而失败,我们会过滤掉它,以便源完成而不发出任何内容
    • 如果原来的 future 成功了,我们会正常传递那个值

    例如:

    RestartSource.onFailuresWithBackoff(
      // yada yada yada
    ) { () =>
      val baseFuture = doSomeAsyncTask().map(Option(_))
      val tweakedFuture = baseFuture.recoverWith {
        case SomeSpecialError => Future.successful(None)
        case NonFatal(e) => baseFuture  // including for clarity
      }
    
      Source.future(tweakedFuture)
        .mapConcat(_.toList)  // swallows the None arising from `SomeSpecialError`
        // the mapAsync in your question is pointless, so I've omitted it,
        // but if it's a placeholder for something else, you'd put it here
    }
    

    以上假设doSomeAsyncTask() 永远不会导致null 成功,但由于nulls 不应该通过Akka Stream 传递并且您没有处理它,这可能是一个相当安全的假设.

    【讨论】:

    • 感谢您的解决方案!我确实完成了它,但感觉有点hacky,这就是为什么我想要另一个解决方案,如果可能的话。无论如何,你的解决方案比我的更干净,所以我真的很感激!
    猜你喜欢
    • 2016-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-13
    • 1970-01-01
    • 2016-10-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多