【问题标题】:Splitting inside a flow in Akka-Streams在 Akka-Streams 中拆分流
【发布时间】:2015-07-31 11:24:29
【问题描述】:

我正在尝试提出一种解决方案,将我收到的传入字符串拆分为多个字符串。我一直在研究,看起来在以前版本的 Akka-Streams 中有一个类 Transformer 你可以扩展它来进行这种转换。

在我使用的版本(RC2)中有Stages,但我不确定如何实现拆分模式。

Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))

我正在寻找XXXXX 组件,它允许我输入String 并返回String 的序列,并将每一个发送到流的其余部分。

【问题讨论】:

  • 如果结果元素始终只依赖于单个输入元素,您可以使用mapConcat。如果依赖关系更复杂,您可以使用(有状态)阶段。
  • 除此之外,一般mapConcat 可以被认为是flatMap。名称不同是因为某些单子定律不成立。

标签: akka akka-stream


【解决方案1】:

我同意@jrudolph 的观点,mapConcat 可能就是您要找的东西。一个简单的例子展示了这个方法的实际效果:

  val strings = List(
  """hello
     world
     test
     this""",
     """foo
     bar
     baz
     """

  )

  implicit val system = ActorSystem("test")
  implicit val mater = ActorFlowMaterializer()
  Source(strings).
    mapConcat(_.split("\n").map(_.trim).toList).
    runForeach(println)

如果您运行此代码,您将看到打印出以下内容:

hello
world
test
this
foo     
bar
baz

【讨论】:

    【解决方案2】:

    Akka 为此类问题提供了Framing 辅助函数。

    假设您的字符集是 UTF-8,您可以编写一个函数,该函数接受分隔的 String 值的最大大小并返回一个可以执行拆分的 Flow

    import akka.stream.scaladsl.Framing
    import akka.util.ByteString
    
    val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
      (maxLineSize) =>
        Flow[String]
          .map(ByteString.apply)
          .via(Framing delimiter (ByteString("\n"), maxLineSize))
          .via(Flow[ByteString] map (_.utf8String))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-10-27
      • 2016-10-20
      • 2019-01-12
      • 1970-01-01
      • 1970-01-01
      • 2021-09-08
      • 2020-02-20
      • 1970-01-01
      相关资源
      最近更新 更多