【问题标题】:Akka-Stream stream within stream流中的 Akka-Stream 流
【发布时间】:2019-09-24 07:05:44
【问题描述】:

我正在尝试弄清楚如何处理这样一种情况,即在您的一个阶段中,您需要进行一个返回 InputStream 的调用,我将在其中将该流作为进一步向下阶段的源来处理。

例如

 Source.map(e => Calls that return an InputStream)
 .via(processingFlow).runwith(sink.ignore)

我希望进入处理流的元素与来自 InputStream 的元素一样。这基本上是一种情况,我正在跟踪文件,读取每一行,该行给我有关我需要针对 CLI api 进行的调用的信息,在进行该调用时,我将 Stdout 作为 InputStream 从中读取结果。结果大部分时间都会很大,所以我可以把所有的东西都收集在内存中。

【问题讨论】:

    标签: akka-stream


    【解决方案1】:
    • 您可以使用StreamConverters 实用程序从java.io 流中获取Sources 和Sinks。更多信息here
    • 您可以使用flatMapConcatflatMapMergeSources 的流扁平化为单个流。更多信息here

    一个简单的例子可以是:

      val source: Source[String, NotUsed] = ???
      def gimmeInputStream(name: String): InputStream = ???
      val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
    
      source
        .map(gimmeInputStream)
        .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
        .via(processingFlow)
        .runWith(Sink.ignore)
    

    但是,Akka Streams 提供了一种更惯用的 DSL 来读取/写入 FileIO 对象中的文件。更多信息here.

    例子变成:

      val source: Source[String, NotUsed] = ???
      val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
    
      source
        .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
        .via(processingFlow)
        .runWith(Sink.ignore)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-10
      • 1970-01-01
      • 2017-05-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-12
      • 2021-06-03
      相关资源
      最近更新 更多