【问题标题】:Spark Streaming : Receive stream for each files under the directorySpark Streaming:接收目录下每个文件的流
【发布时间】:2017-09-13 18:26:39
【问题描述】:

我正在使用 Spark Core 和 SQL 来处理给定目录下的多个 csv 文件(都具有不同的结构)。到目前为止,我们必须手动将文件放在所需位置并运行作业。

 val rdd1 = sc.textFile("csv1")
 /* transformations here for rdd1 */
 val rdd2 = sc.textFile("csv2")
 /* transformations here for rdd2 */
 val rdd3 = sc.textFile("csv3")
 /* transformations here for rdd3 */

但是现在,我想在文件到达目录时使用 Spark Streaming 进行相同的处理。 我不明白如何接收与给定文件有关的流并对其进行处理。 此外,我将在每 15 天后收到文件。

我怎样才能达到同样的效果?任何帮助将不胜感激。

谢谢!!!

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    你看过文件流吗?

    文件流:为了从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以将 DStream 创建为:

    Scala 代码: streamingContext.fileStreamKeyClass、ValueClass、InputFormatClass

    Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的任何文件(不支持写入嵌套目录中的文件)。请注意

    • 文件必须具有相同的数据格式。
    • 必须在 dataDirectory 中创建文件,方法是自动将它们移动或重命名到数据目录中。
    • 一旦移动,文件不得更改。因此,如果文件被连续追加,则不会读取新数据。

    对于简单的文本文件,有一个更简单的方法 streamingContext.textFileStream(dataDirectory)。并且文件流不需要运行接收器,因此不需要分配内核。

    【讨论】:

    • 是的,我正在调查它。 val ssc = new StreamingContext(sc, Seconds(60)) val x = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs path") val result = x.map{case (x, y) => (x.toString, y.toString)} result.print()。我可以将文件内容作为字符串值获取,但是如何获取正在处理的文件的名称?
    • 我也可以将它映射到特定的案例类。但问题是我必须分别处理多个目录,每个目录都有一组相互关联的文件。
    猜你喜欢
    • 1970-01-01
    • 2018-02-11
    • 2015-09-08
    • 2015-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-20
    • 1970-01-01
    相关资源
    最近更新 更多