【发布时间】:2018-12-15 04:30:14
【问题描述】:
如何使用 apache spark 流式传输已存在于 HDFS 中的文件?
我有一个非常具体的用例,我有数百万客户数据,我想使用 apache 流在客户级别处理它们。目前我正在尝试做的是获取整个客户数据集并在 customerId 上对其进行 repartition 并创建 100 个这样的分区并确保单个客户的多个记录被传递流。
现在我拥有了 HDFS 位置中的所有数据
hdfs:///tmp/dataset
现在使用上面的 HDFS 位置,我想流式传输将读取 parquet 文件获取数据集的文件。我尝试了以下方法,但没有运气。
// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.sparkContext.textFile("hdfs:///tmp/dataset")
println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)
ssc.start()
ssc.awaitTermination()
NOTE: This solution doesn't stream data it just reads data from HDFS
和
// start stream
val sparkConf = new SparkConf().setAppName("StreamApp")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(60))
val dstream = ssc.textFileStream("hdfs:///tmp/dataset")
println("dstream: " + dstream)
println("dstream count: " + dstream.count())
println("dstream context: " + dstream.context)
dstream.print()
ssc.start()
ssc.awaitTermination()
我总是得到 0 结果。如果 HDFS 中已经存在没有新文件发布的文件,则可以从 HDFS 流式传输文件。
【问题讨论】:
-
最简单的方法是在流上下文启动后将文件移动到
hdfs:///tmp/dataset。
标签: scala apache-spark apache-spark-sql hdfs spark-streaming