【发布时间】:2018-01-29 16:51:10
【问题描述】:
我有以下 sn-p,它读取一个 CSV 文件并将一些内容打印到控制台:
def readUsingAkkaStreams = {
import java.io.File
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import java.security.MessageDigest
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
val file = new File("/path/to/csv/file.csv")
val fileSource = FileIO.fromFile(file, 65536)
val flow = fileSource.map(chunk => chunk.utf8String)
flow.to(Sink.foreach(println(_))).run
}
我现在对此有一些疑问:
块大小是以字节为单位的大小。内部如何处理?我的意思是我最终会遇到一个块可能只包含一行中的部分元素的情况吗?
此流如何终止?现在没有!我想让它知道它已经完全读取了文件并且应该触发停止信号!有没有一种机制可以做到这一点?
编辑 1:根据下面帖子的建议,我收到如屏幕截图所示的错误消息!
编辑 2:
通过设置 maximumFrameLength 以匹配最大块大小的大小(即 65536)来消除错误。
val file = new File("/path/to/csf/file.csv")
val chunkSize = 65536
val fileSource = FileIO.fromFile(file, chunkSize).via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = chunkSize,
allowTruncation = true))
【问题讨论】:
-
在 Alpakka 中使用 CsvParser 支持:developer.lightbend.com/docs/alpakka/current/…
标签: scala akka-stream