【问题标题】:Stream files from HDFS using Apache Spark Steaming使用 Apache Spark Streaming 从 HDFS 流式传输文件
【发布时间】:2018-12-15 04:30:14
【问题描述】:

如何使用 apache spark 流式传输已存在于 HDFS 中的文件?

我有一个非常具体的用例,我有数百万客户数据,我想使用 apache 流在客户级别处理它们。目前我正在尝试做的是获取整个客户数据集并在 customerId 上对其进行 repartition 并创建 100 个这样的分区并确保单个客户的多个记录被传递流。

现在我拥有了 HDFS 位置中的所有数据

hdfs:///tmp/dataset

现在使用上面的 HDFS 位置,我想流式传输将读取 parquet 文件获取数据集的文件。我尝试了以下方法,但没有运气。

// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")

// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.sparkContext.textFile("hdfs:///tmp/dataset")

println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)

ssc.start()
ssc.awaitTermination()


NOTE: This solution doesn't stream data it just reads data from HDFS

// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")

// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.textFileStream("hdfs:///tmp/dataset")

println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)

dstream.print()
ssc.start()
ssc.awaitTermination()

我总是得到 0 结果。如果 HDFS 中已经存在没有新文件发布的文件,则可以从 HDFS 流式传输文件。

【问题讨论】:

  • 最简单的方法是在流上下文启动后将文件移动到hdfs:///tmp/dataset

标签: scala apache-spark apache-spark-sql hdfs spark-streaming


【解决方案1】:

TL;DR 目前 spark 不支持此功能。最接近的方法是在启动流式上下文后将文件移动到 hdfs:///tmp/dataset


textFileStream 内部使用FileInputDStream,其中有一个选项newFilesOnly。但这不会处理所有现有文件,而只会处理在流式传输上下文之前一分钟内修改的文件(由配置值 spark.streaming.fileStream.minRememberDuration 设置)。作为described in jira issue

当您将 newFilesOnly 设置为 false 时,这意味着此 FileInputDStream 不仅会处理即将到来的文件,还会包含过去 1 分钟内出现的文件(不是所有旧文件)。 FileInputDStream.MIN_REMEMBER_DURATION 中定义的时间长度。

或者

您可以在启动流式上下文之前从现有文件中创建(正常)RDD。以后可以和stream RDD一起使用。

【讨论】:

    猜你喜欢
    • 2014-11-11
    • 2015-06-16
    • 1970-01-01
    • 2019-01-20
    • 2017-06-14
    • 2015-12-18
    • 2015-12-12
    • 2021-07-21
    • 2018-04-23
    相关资源
    最近更新 更多