【问题标题】:Read parquet files created in last 2 hours in Pypark在 Pyspark 中读取过去 2 小时内创建的镶木地板文件
【发布时间】:2021-04-24 13:41:23
【问题描述】:
我想读取过去 2 小时内创建的 Azure Blob 存储中的多个 parquet 文件。
我可以使用以下命令读取多个文件
df = sqlContext.read.parquet("/path/*.parquet")
此查询返回该文件夹中所有 parquet 文件的结果,现在我只想从过去 2 小时内创建的 parquet 文件中获取数据。
请帮助我正确执行命令。
【问题讨论】:
标签:
apache-spark
pyspark
azure-active-directory
parquet
azure-blob-storage
【解决方案1】:
一种方法是使用Hadoop FS APIlistStatus 方法列出该文件夹下的所有文件,使用getModificationTime 选择在过去两个小时内修改的文件,并将过滤后的文件列表传递给 Spark DataFrame 阅读器。
在 Pyspark 中,您可以像这样通过 JVM 网关访问 hadoop fs:
import datetime
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
data_path = "/path/*.parquet"
fs = Path(data_path).getFileSystem(conf)
file_status = fs.listStatus(Path(data_path))
last_2hours_time = (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp() * 1000
created_last_2hours = [
f.getPath().toString() for f in file_status if f.getModificationTime() >= last_2hours_time
]
df = spark.read.parquet(*created_last_2hours)
您可能还想查看 Python package 以了解 Azure Blob 存储以列出文件。
【解决方案2】:
标准的寻址方式是使用带有检查点的 spark Structured Streaming。
首先在整个数据上运行它,然后使用相同的检查点每 2 小时运行一次您的特定业务逻辑:
- 第一次运行将填充检查点数据,这样您就不会在第二次运行期间检查相同的文件。
- 您可以使用空逻辑进行第一次运行,例如在下面的
my_function() 示例中什么都不做。
spark.readStream.parquet("/path/*.parquet")\
.writeStream \
.trigger(once=True) \
.outputMode('append') \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.start(path) \
.awaitTermination()
或者类似的东西
def my_function(df, epochId):
# do anything here
spark.readStream.parquet("/path/*.parquet") \
.writeStream \
.trigger(once=True) \
.foreachBatch(my_function) \
.option('checkpointLocation', 'some_location') \
.start() \
.awaitTermination()
查看其他examples