【问题标题】:How to retry failed Unmarshalling of a stream of akka-http requests?如何重试失败的akka​​-http请求流解组?
【发布时间】:2016-03-31 16:07:02
【问题描述】:

我知道使用 ActorMaterialzer 上的监督策略可以在出错时重新启动 akka-stream

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

来源:http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html

我有以下用例。

/***
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-experimental"            % "2.4.2",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future

object SO extends DefaultJsonProtocol {

  implicit val system = ActorSystem()
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")

  def search(query: Char) = {
    val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
    (request, request)
  }

  case class Hello(name: String)
  implicit val helloFormat = jsonFormat1(Hello)

  val searches =
    Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
      case (Success(response), _) => Unmarshal(response).to[Hello]
      case (Failure(e), _) => Future.failed(e)
    }

  def main(): Unit = {
    Await.result(searches.runForeach(_ => println), Duration.Inf)
    ()
  }
}

有时查询将无法解组。我想对该单个查询使用重试策略 https://example.org/?q=v 无需重新启动整个字母表。

【问题讨论】:

  • 您想对下一个字母使用相同的查询,还是想用相同的字母重试相同的查询,或者您想使用不同的查询?如果您只想尝试下一个查询,您可以将监督策略添加到 mapAsync,例如 .withAttributes(supervisionStrategy(resumingDecider))(但它在您链接的文档中,所以我假设您在寻找其他东西)
  • 是的,重试单个查询,相同的字母。如果v 失败并返回 500,我想再次重试v 两次。
  • 您是否需要将其作为流程的一部分运行,因为您可以将其作为单独的流程运行,然后使用标准的未来机制重试,我认为这会更容易。否则我认为你必须创建一个图表来反馈失败的查询。如果您有兴趣,我可以向您展示任何一种或两种解决方案。
  • 我会对图形解决方案感兴趣

标签: scala akka-stream akka-http akka-supervision


【解决方案1】:

我认为用 supervsior 策略实现它会很困难(或不可能),主要是因为您想重试“n”次(根据 cmets 中的讨论),而且我认为您无法跟踪使用监督时元素被尝试的次数。

我认为有两种方法可以解决这个问题。要么将有风险的操作作为单独的流处理,要么创建一个图形来进行错误处理。我将提出两种解决方案。

还要注意 Akka Streams distinguishes between errors and failures,所以如果你不处理你的失败,它们最终会崩溃流(如果没有引入策略),所以在下面的示例中我将它们转换为 Either,它代表成功或错误。

单独的流

您可以做的是将每个字母视为一个单独的流,并使用重试策略分别处理每个字母的失败,以及一些延迟。

// this comes after your helloFormat

// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
  f.recoverWith {
    // you may want to only handle certain exceptions here...
    case ex: Exception if c > 0 =>
      println(s"failed - will retry ${c - 1} more times")
      akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
  }

val singleElementFlow = httpFlow.mapAsync[Hello](1) {
  case (Success(response), _) =>
    val f = Unmarshal(response).to[Hello]
    f.recoverWith {
      case ex: Exception =>
        // see https://github.com/akka/akka/issues/20192
        response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
    }
  case (Failure(e), _) => Future.failed(e)
}

// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
  Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
    println(s"trying $elem")
    retry(
      Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
      1.seconds, 3
    ).map(ok => Right(ok)).recover { case ex => Left(ex) }
  }
// end

图表

此方法会将失败整合到图表中,并允许重试。此示例使所有请求并行运行,并希望重试那些失败的请求,但如果您不希望这种行为并一个接一个地运行它们,我相信您也可以这样做。

// this comes after your helloFormat

// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]

def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
  val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
  (request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}

val g = GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val searches = b.add(Flow[Char])

  val tryParse =
    Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
      case (Success(response), (req, tries)) =>
        println(s"trying parse response to $req for $tries")
        Unmarshal(response).to[Hello].
          map(h => Right(h)).
          recoverWith {
            case ex: Exception =>
              // see https://github.com/akka/akka/issues/20192
              response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
                Left((req, tries + 1))
              }
          }
      case (Failure(e), _) => Future.failed(e)
    }

  val broadcast = b.add(Broadcast[ParseResult](2))

  val nonErrors = b.add(Flow[ParseResult].collect {
    case Right(x) => x
    // you may also handle here Lefts which do exceeded retries count
  })

  val errors = Flow[ParseResult].collect {
    case Left(x) if x._2 < 3 => (x._1, x)
  }
  val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))

  // @formatter:off
  searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
                          merge.preferred <~ errors <~ broadcast
  // @formatter:on

  FlowShape(searches.in, nonErrors.out)
}

def main(args: Array[String]): Unit = {
  val source = Source('a' to 'z')
  val sink = Sink.seq[Hello]

  source.via(g).toMat(sink)(Keep.right).run().onComplete {
    case Success(seq) =>
      println(seq)
    case Failure(ex) =>
      println(ex)
  }

}

基本上这里发生的事情是我们通过httpFlow 运行搜索,然后尝试解析响应,我们 然后广播结果并拆分错误和非错误,非错误进入接收器,并发送错误 回到循环。如果重试次数超过计数,我们忽略该元素,但你也可以这样做 别的东西。

无论如何,我希望这能给你一些想法。

【讨论】:

  • 非常努力。我会为你的答案开一个赏金。我认为 Graph 应该是 FlowShape,因为我仍然想在之后处理 Hello。你也确定这个合并是正确的吗? stackoverflow.com/a/33962702/449071
  • 如果你想从外部添加输入并在之后处理它,那么可以,如果你只是想在之后处理Hallo,但它来自Graph,它可以是SourceShape。然后你不要将它包装在RunnableGraph 中,你可以像Source.fromGraph(g).runWith(Sink.seq[Hello]) 一样运行它(例如)。是的,如果您需要完成您的流,您需要使合并急切,就像在您发送的链接中一样。我会更新我的答案。
【解决方案2】:

对于上面的流解决方案,流中最后一个元素的任何重试都不会执行。这是因为当上游在发送最后一个元素后完成时,合并也将完成。之后,唯一的输出来自非重试出口,但由于元素要重试,所以也完成了。

如果您需要所有输入元素来生成输出,您将需要一个额外的机制来阻止上游完成到达进程和重试图。一种可能性是使用 BidiFlow 来监视进程和重试图中的输入和输出,以确保在传播 oncomplete 之前已生成所有必需的输出(对于观察到的输入)。在简单的情况下,可能只是计算输入和输出元素。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-06
    • 1970-01-01
    • 2014-12-14
    • 2012-04-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多