【发布时间】:2023-04-03 13:44:01
【问题描述】:
如果出现异常,我正在使用 RestartSource.onFailuresWithBackoff 重新启动源,但如果收到某种异常类型,我想停止(取消)重新启动。例如:
RestartSource
.onFailuresWithBackoff(
minBackoff = 1,
maxBackoff = 5,
randomFactor = 0.2,
maxRestarts = 3
) { () =>
val responseFuture = doSomeAsyncTask().recover {
case SomeSpecialError =>
// I want to quit from the restarts
case NonFatal(ex) =>
// Re-throw so that the Source is restarted
throw ex
}
Source
.future(responseFuture)
.mapAsync(parallelism = 1)(Future.successful(_))
}
我尝试在包装的 Source 和 RestartSource 上设置监督策略,但事件从未到达它。出于这个原因,同样的解释也适用于尝试在 Sink 运算符上执行此操作。
【问题讨论】:
标签: scala akka-stream