【问题标题】:Read HDFS file as stream将 HDFS 文件作为流读取
【发布时间】:2025-12-22 13:30:06
【问题描述】:

我的 DataLake (HDFS) adl://datalake.azuredatalakestore.net/Data/prod/2018/2018-02/AHA-2018-02-13.json 中有一个文件。

此文件从 ESB 连续接收数据。因此,我想像流一样读取这个文件并在行上执行操作。我试过这个:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = pyspark.SparkContext()
ssc = StreamingContext(sc, batchDuration=60)
a = 'adl://datalake.azuredatalakestore.net/Data/prod/2018/2018-02/AHA-2018-02-13.csv'
b = "adl://datalake.azuredatalakestore.net/Data/dev/test.out"
lines = ssc.textFileStream(a)
words = lines.flatMap(lambda x : x.split(' '))
wordCounts = words.map(lambda x :  (x, 1)).reduceByKey(lambda x,y : x+y)
wordCounts.saveAsTextFiles(b)
ssc.start()
ssc.awaitTermination()

这段代码运行没有错误,但在我的输出目录adl://datalake.azuredatalakestore.net/Data/dev/test.out 中,我可以看到每分钟创建几个文件夹,尽管我知道我收到了数据,但都是空的。

上面的代码只是一个例子。我目前正在尝试了解如何对我的文件执行流读取。对我来说,第一件事是计算文件中每个新行的字数。你能告诉我我当前的代码有什么问题吗?

【问题讨论】:

  • @user6910411 根据您的帖子,我尝试通过在进程运行时添加新文件来执行相同的操作......我仍然收到空文件作为输出。
  • 这不是添加新文件,而是进行 atomic 更改。无法写入文件。
  • @user6910411 实际上,我发现了问题所在。我应该监控一个文件夹,而不是一个文件。目前,我正在跟踪一个文件...

标签: python apache-spark pyspark streaming hdfs


【解决方案1】:

问题来自于我正在观看特定文件这一事实。我应该看一个文件夹来捕捉新文件。显然,在追加数据时没有本地方法来查看文件。

【讨论】: