【问题标题】:How to use an Akka Streams SourceQueue with PlayFramework如何将 Akka Streams SourceQueue 与 PlayFramework 一起使用
【发布时间】:2016-07-23 16:27:50
【问题描述】:

我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中。 播放控制器需要一个 Source 才能使用 chuncked 方法流式传输结果。
由于 Play 在后台使用其自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源会在 chunked 方法使用之前被消耗(除非我使用以下 hack)。

如果我使用响应式流发布者预先实现源队列,我可以让它工作,但它是一种“肮脏的黑客”:

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

有没有更简单的方法将 Akka Streams SourceQueue 与 PlayFramework 结合使用?

谢谢

【问题讨论】:

  • 我非常喜欢这种方法。为什么你觉得它“脏”?

标签: scala playframework akka akka-stream


【解决方案1】:

解决方案是在源上使用mapMaterializedValue 来获得其队列实现的未来:

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }

【讨论】:

  • 为什么如果我做queueSource.map { _.toUpperCase },例如我没有得到一个Source[String,NotUsed]?相反,这会返回错误Expression of type queueSource.Repr[String] doesn't conform to expected type Source[String,NotUsed]. 您将在哪里对源的元素进行转换?就像your example中的记号一样
  • 你能用 Java 做到这一点吗?
  • Source.preMaterialize 也可以用来代替peekMatValue 方法
【解决方案2】:

想分享我今天得到的一个见解,虽然它可能不适合你在 Play 中的情况。

与其考虑触发Source,不如将​​问题颠倒过来,并为执行采购的函数提供Sink

在这种情况下,Sink 将是“配方”(非实现)阶段,我们现在可以使用 Source.queue 并立即实现它。排队了得到了它运行的流程。

【讨论】:

  • 很有趣,我很乐意看到一个例子:)
  • @Loic 我会,但我使用它的代码目前是封闭源代码。再考虑几天(低火),我认为这与在上层制作 SourceQueue 相同,并将“提供”功能暴露给任何想要使用源的人。
猜你喜欢
  • 2016-06-21
  • 1970-01-01
  • 2013-04-11
  • 1970-01-01
  • 1970-01-01
  • 2017-07-03
  • 2014-07-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多