【发布时间】:2021-01-01 08:26:10
【问题描述】:
我有 2 个带有排序数据的 csv 文件: 文件 1:已排序的数字 (~1GB) 文件 2:排序后的数字 + 额外数据 (~20GB)
我需要在文件 2 中查找文件 1 中的所有数字并进行一些处理(文件 2 中不存在于文件 1 中的数字被跳过)。
到目前为止,我有:
object MainQueue extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
program[IO].compile.drain.as(ExitCode.Success)
def program[F[_]: Sync: ContextShift](): Stream[F, Unit] =
for {
number <- numberStream
record <- records
.through(parser())
.through(findRecord(number))
_ <- Stream.emit(println(s"$number <-> $record"))
} yield ()
def findRecord[F[_]](phone: Long): Pipe[F, Long, Long] =
_.dropWhile(r => {
println(s"Reading $r")
r < phone
}).head //halts the stream
def numberStream[F[_]](): Stream[F, Long] =
Stream(100L, 120L)
//TODO: make stream continue and not halt and restart
def records[F[_]: Sync: ContextShift](): Stream[F, String] =
Stream
.resource(Blocker[F])
.flatMap { bec =>
readAll[F](Paths.get("small.csv"), bec, 4096)
}
.through(text.utf8Decode)
.through(text.lines)
def parser[F[_]](): Pipe[F, String, Long] = ??? //parse
def writer[F[_]](): Pipe[F, Long, Unit] =
_.map(v => {
println(s"Found: $v")
})
}
哪些打印:
Reading 50
Reading 100
100 <-> 100
Reading 50
Reading 100
Reading 120
120 <-> 120
这意味着第二个流为文件 1 中的每个值重新启动,我如何保持上次读取的位置并从那里开始?数字是排序的,所以没有点重新开始。 我对 scala 和 fs2 非常陌生,因此非常感谢您解释我的误解。
谢谢!
【问题讨论】:
-
两个文件中的数字是否不同或单个文件可能包含重复项?
-
File1 可能有重复,而 file2 没有。所有数字按升序排列。 File1 不是一个子集,因为它的数字的一小部分从 file2 中丢失,但这些情况可以跳过/忽略