【发布时间】:2016-07-23 16:27:50
【问题描述】:
我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。
播放控制器需要一个 Source 才能使用 chuncked 方法流式传输结果。
由于 Play 在后台使用其自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked 方法使用之前被消耗(除非我使用以下 hack)。
如果我使用响应式流发布者预先实现源队列,我可以让它工作,但它是一种“肮脏的黑客”:
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
有没有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?
谢谢
【问题讨论】:
-
我非常喜欢这种方法。为什么你觉得它“脏”?
标签: scala playframework akka akka-stream