【发布时间】:2018-01-24 02:29:49
【问题描述】:
有一个全天更新的 CSV 文件数据湖。我正在尝试使用 Trigger.Once 功能 outlined in this blog post 创建 Spark 结构化流式处理作业,以定期将已写入 Parquet 数据湖中 CSV 数据湖的新数据写入。
这是我所拥有的:
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
以下命令将所有数据写入 Parquet 湖,但在所有数据写入后并没有停止(我不得不手动取消作业)。
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
以下作业也有效,但在写入所有数据后也没有停止(我不得不手动取消作业):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
以下命令在写入任何数据之前停止查询。
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
如何将writeStream 查询配置为等到所有增量数据都写入 Parquet 文件然后停止?
【问题讨论】:
-
“没有停止”是什么意思?这是一个流媒体作业,它不应该停止,只是每天触发一次。
-
@YuvalItzchakov- 我想启动一个集群,将 CSV 湖中的新数据写入 Parquet 湖,然后关闭集群。我假设 writeStream 进程会停止。在 Databricks 博客文章 (databricks.com/blog/2017/05/22/…) 中,“使用 Databricks 调度运行”部分的图片显示了具有设定持续时间和成功状态的作业。如果 writeStream 作业继续运行,则集群不会关闭。我想我一定错过了什么。
-
你没有遗漏任何东西,我查看了代码,似乎查询应该在执行单个作业后终止。
-
我无法复制它。你能用本地文件系统创建一个复制器吗?
-
@Powers 我也面临同样的问题。理想情况下,流式查询应该在作业完成后停止。但是,它并没有停止。事实上,它一直在运行,并且还保持连接打开。
标签: scala apache-spark spark-structured-streaming