【发布时间】:2021-07-14 19:43:53
【问题描述】:
问题
我有一个总共执行两个流的作业,但我希望在第一个流完成后开始最后一个流,因为第一个流将来自 readstream 的事件保存在 DeltaTable 中,用作第二个流的输入。问题是,在当前笔记本运行中,第一个流中添加的内容在第二个流中不可用,因为它们同时启动。
有没有办法在从同一个笔记本运行时强制执行命令?
我尝试了awaitTermination 功能,但发现这并不能解决我的问题。一些伪代码:
def main():
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events")
# Parse events
metricbeat_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events")
TLDR
总结问题:当我运行上面的代码时,query1 和 query2 同时启动,这导致my_db.joined_bronze_events 比my_db.raw_events 稍晚,因为在 query1 中添加的内容不可用在当前运行的 query2 中(当然会在下一次运行中)。
有没有办法强制 query2 在 query1 完成之前不会启动,同时仍在同一个笔记本中运行它?
【问题讨论】:
标签: apache-spark pyspark spark-streaming spark-structured-streaming