【发布时间】:2016-03-31 16:07:02
【问题描述】:
我知道使用 ActorMaterialzer 上的监督策略可以在出错时重新启动 akka-stream
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
来源:http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html
我有以下用例。
/***
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future
object SO extends DefaultJsonProtocol {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")
def search(query: Char) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, request)
}
case class Hello(name: String)
implicit val helloFormat = jsonFormat1(Hello)
val searches =
Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
case (Success(response), _) => Unmarshal(response).to[Hello]
case (Failure(e), _) => Future.failed(e)
}
def main(): Unit = {
Await.result(searches.runForeach(_ => println), Duration.Inf)
()
}
}
有时查询将无法解组。我想对该单个查询使用重试策略
https://example.org/?q=v 无需重新启动整个字母表。
【问题讨论】:
-
您想对下一个字母使用相同的查询,还是想用相同的字母重试相同的查询,或者您想使用不同的查询?如果您只想尝试下一个查询,您可以将监督策略添加到
mapAsync,例如.withAttributes(supervisionStrategy(resumingDecider))(但它在您链接的文档中,所以我假设您在寻找其他东西) -
是的,重试单个查询,相同的字母。如果
v失败并返回 500,我想再次重试v两次。 -
您是否需要将其作为流程的一部分运行,因为您可以将其作为单独的流程运行,然后使用标准的未来机制重试,我认为这会更容易。否则我认为你必须创建一个图表来反馈失败的查询。如果您有兴趣,我可以向您展示任何一种或两种解决方案。
-
我会对图形解决方案感兴趣
标签: scala akka-stream akka-http akka-supervision