【发布时间】:2017-01-30 04:15:56
【问题描述】:
我正在尝试使用流而不是纯actor来处理http请求,我提供了以下代码:
trait ImagesRoute {
val log = LoggerFactory.getLogger(this.getClass)
implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer
val source =
Source
.actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.toMat(Sink.asPublisher(true))(Keep.both)
val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>
val (ref, publisher) = source.run()
val addFuture = Source.fromPublisher(publisher)
val future = addFuture.runWith(Sink.head[Option[Image]])
ref ! image
onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)
case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)
}
}
}
}
}
}
}
我不确定这是否是正确的方法,或者即使这是一个好方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部,流一切。
有什么想法吗?
【问题讨论】:
-
如果我没记错的话,在你的情况下你根本不需要流。据我了解,
ImageRepository.add方法返回一个Future;你需要做的就是写onComplete(ImageRepository.add(image)),仅此而已。 -
@VladimirMatveev 是的,没错,这只是一个简单的例子,但是流管道应该更大,可以做很多事情,比如联系外部资源并最终背压......
标签: scala akka akka-stream akka-http