【发布时间】:2016-09-26 07:52:16
【问题描述】:
我在 HDFS 上有一个目录,其中每 10 分钟 复制一个文件(现有文件被覆盖)。 我想使用 Spark 流 (1.6.0) 读取文件的内容,并将其用作参考数据以将其连接到其他流。
我将“记住窗口”spark.streaming.fileStream.minRememberDuration设置为“600s”并将newFilesOnly设置为false , 因为
当我启动应用程序时,我不想从已经存在的 HDFS 中获取初始数据。
val ssc = new StreamingContext(sparkConf, Seconds(2))
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
val lines: DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](loc, defaultFilter(_), false).map(_._2.toString)
lines.foreachRDD { x => x.foreach(println) }
我的想法是将这个 DStream 的内容持久化到内存中,并委托维护的任务 这个“批量查找缓存”到 Spark。 我希望在 HDFS 目录的每次更改后自动获得新数据,我可以加入到另一个流中。
我不明白的:
- 当我启动应用程序时,数据已加载,但如果 我在本地触摸文件并覆盖 HDFS 上我看不到的文件 它的内容不再打印出来了
- 如何缓存和重新加载这些数据?
- 当我缓存它时,它将在工作节点上可用,或者 这(连同连接)会在驱动程序中发生吗?
我是否也应该将 StreamingContext 时间间隔设置为 10 分钟,因为我只会每 10 分钟更改一次?
【问题讨论】:
标签: scala hadoop apache-spark spark-streaming