【问题标题】:Reading fileStream with Spark Streaming使用 Spark Streaming 读取 fileStream
【发布时间】:2016-09-26 07:52:16
【问题描述】:

我在 HDFS 上有一个目录,其中每 10 分钟 复制一个文件(现有文件被覆盖)。 我想使用 Spark 流 (1.6.0) 读取文件的内容,并将其用作参考数据以将其连接到其他流。

我将“记住窗口spark.streaming.fileStream.minRememberDuration设置为“600s”并将newFilesOnly设置为false , 因为 当我启动应用程序时,我不想从已经存在的 HDFS 中获取初始数据。

val ssc = new StreamingContext(sparkConf, Seconds(2))
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
val lines: DStream[String] = 
   ssc.fileStream[LongWritable, Text, TextInputFormat](loc, defaultFilter(_), false).map(_._2.toString)
lines.foreachRDD { x => x.foreach(println) }

我的想法是将这个 DStream 的内容持久化到内存中,并委托维护的任务 这个“批量查找缓存”到 Spark。 我希望在 HDFS 目录的每次更改后自动获得新数据,我可以加入到另一个流中。

我不明白的:

  • 当我启动应用程序时,数据已加载,但如果 我在本地触摸文件并覆盖 HDFS 上我看不到的文件 它的内容不再打印出来了
  • 如何缓存和重新加载这些数据?
  • 当我缓存它时,它将在工作节点上可用,或者 这(连同连接)会在驱动程序中发生吗?

我是否也应该将 StreamingContext 时间间隔设置为 10 分钟,因为我只会每 10 分钟更改一次?

【问题讨论】:

    标签: scala hadoop apache-spark spark-streaming


    【解决方案1】:

    只是一些原始的想法。

    当我启动应用程序时,数据已加载,但如果我触摸 本地文件并覆盖 HDFS 上的文件我看不到它的内容 打印出来了

    为了让 Spark Streaming 处理数据,必须以原子方式创建文件,例如通过将文件移动到 Spark 正在监视的目录中。文件重命名操作通常是原子的。能否请您测试一下以验证它是否正常工作?

    如何缓存和重新加载这些数据? 当我缓存它时,它将在工作节点上可用,或者这个 (连同连接)会发生在驱动程序中吗?

    直接的解决方案可能是在foreachRDD() 方法中注册临时表。当流式传输过程中出现新数据时,可以重新创建适当的表。请记住,foreachRDD() 方法中的逻辑应该是幂等的。

    知道表名后,您可以轻松地创建一个单独的查询管道,该管道将连接来自该预缓存临时表的数据。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。

    我是否也应该将 StreamingContext 时间间隔设置为 10 分钟 我只会每 10 分钟更改一次?

    在理想情况下,节奏应该匹配。为了安全起见,您也可以在foreachRDD() 方法中收到新数据时检查时间戳。

    【讨论】:

      猜你喜欢
      • 2018-01-28
      • 2015-12-03
      • 2015-04-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-24
      • 2018-01-17
      • 1970-01-01
      相关资源
      最近更新 更多