【问题标题】:How to bind akka http with akka streams?如何将akka http与akka流绑定?
【发布时间】:2017-01-30 04:15:56
【问题描述】:

我正在尝试使用流而不是纯actor来处理http请求,我提供了以下代码:

trait ImagesRoute {

  val log = LoggerFactory.getLogger(this.getClass)

  implicit def actorRefFactory: ActorRefFactory
  implicit def materializer: ActorMaterializer

  val source =
    Source
      .actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
      .via(Flow[Image].mapAsync(1)(ImageRepository.add))
      .toMat(Sink.asPublisher(true))(Keep.both)

  val route = {
    pathPrefix("images") {
      pathEnd {
        post {
          entity(as[Image]) { image =>

            val (ref, publisher) = source.run()

            val addFuture = Source.fromPublisher(publisher)

            val future = addFuture.runWith(Sink.head[Option[Image]])

            ref ! image

            onComplete(future.mapTo[Option[Image]]) {
              case Success(img) =>
                complete(Created, img)

              case Failure(e) =>
                log.error("Error adding image resource", e)
                complete(InternalServerError, e.getMessage)
            }
          }
        }
      }
    }
  }
}

我不确定这是否是正确的方法,或者即使这是一个好方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部,流一切。

有什么想法吗?

【问题讨论】:

  • 如果我没记错的话,在你的情况下你根本不需要流。据我了解,ImageRepository.add 方法返回一个Future;你需要做的就是写onComplete(ImageRepository.add(image)),仅此而已。
  • @VladimirMatveev 是的,没错,这只是一个简单的例子,但是流管道应该更大,可以做很多事情,比如联系外部资源并最终背压......

标签: scala akka akka-stream akka-http


【解决方案1】:

如果您只希望实体提供 1 张图片,则无需从 ActorRef 创建Source,也不需要Sink.asPublisher,只需使用Source.single

def imageToComplete(img : Option[Image]) : StandardRoute = 
  img.map(i => complete(Created, i))
     .getOrElse {
       log error ("Error adding image resource", e)
       complete(InternalServerError, e.getMessage
     }

...

entity(as[Image]) { image =>

  val future : Future[StandardRoute] = 
    Source.single(image)
          .via(Flow[Image].mapAsync(1)(ImageRepository.add))
          .runWith(Sink.head[Option[Image]])
          .map(imageToComplete)

  onComplete(future)
}

进一步简化您的代码,您只处理 1 个图像这一事实意味着 Streams 是不必要的,因为只需 1 个元素就不需要背压:

val future : Future[StandardRoute] = ImageRepository.add(image)
                                                    .map(imageToComplete)

onComplete(future)

在你指出的 cmets 中

"这只是一个简单的例子,但是流管道应该是 做很多事情,比如联系外部资源和 最终背压的东西”

这仅适用于您的实体是图像流的情况。如果每个 HttpRequest 只处理 1 个图像,那么背压永远不会应用,并且您创建的任何流都将是 slower version of a Future

如果您的实体实际上是图像流,那么您可以将其用作流的一部分:

val byteStrToImage : Flow[ByteString, Image, _] = ???

val imageToByteStr : Flow[Image, Source[ByteString], _] = ???

def imageOptToSource(img : Option[Image]) : Source[Image,_] =
  Source fromIterator img.toIterator

val route = path("images") {
  post {
    extractRequestEntity { reqEntity =>

      val stream = reqEntity.via(byteStrToImage)
                            .via(Flow[Image].mapAsync(1)(ImageRepository.add))
                            .via(Flow.flatMapConcat(imageOptToSource))
                            .via(Flow.flatMapConcat(imageToByteStr))

      complete(HttpResponse(status=Created,entity = stream))
    }
  }
}    

【讨论】:

  • 谢谢,这很有意义
猜你喜欢
  • 2017-07-17
  • 2018-07-08
  • 1970-01-01
  • 2019-04-16
  • 1970-01-01
  • 2019-12-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多