【问题标题】:Subscribing to an Akka stream, reading files订阅 Akka 流,读取文件
【发布时间】:2019-06-26 21:01:42
【问题描述】:

我正在尝试使用 Akka Streams 读取 Scala 中的文件,我想将结果放入列表中。我尝试了以下代码,该列表在接收器内使用新值递增,但在外部我得到一个空列表。

def readStream (path : String, date : String) : List[Array[String]] = {
  var lines: List[scala.Array[String]] = List[scala.Array[String]]()

  implicit val system = ActorSystem("Sys")
  val settings = ActorMaterializerSettings(system)
  implicit val materializer = ActorMaterializer(settings)

  val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
    val list : List[scala.Array[String]] = List(x.split("|"))
    lines = lines ++ list
    // println(lines.length)
  })

  val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
    .to(sink)
    .run()
  lines
}

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    三件事:(1)将actor系统和物化器传递给您的方法(显式或作为隐式参数)而不是在方法内部创建它们,(2)使用Sink.seq,以及(3)使用toMatKeep.right获取Sink的物化值(to保留Source的物化值):

    val result: Future[Seq[String]] =
      FileIO.fromPath(...)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
        .toMat(Sink.seq)(Keep.right)
        .run()
    

    或者,使用toMatKeep.right 的简写是runWith

    val result: Future[Seq[String]] =
      FileIO.fromPath(...)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
        .runWith(Sink.seq)
    

    【讨论】:

      猜你喜欢
      • 2016-10-14
      • 2017-12-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多