【问题标题】:How to make Spark streams execute sequentially如何使 Spark 流按顺序执行
【发布时间】:2021-07-14 19:43:53
【问题描述】:

问题

我有一个总共执行两个流的作业,但我希望在第一个流完成后开始最后一个流,因为第一个流将来自 readstream 的事件保存在 DeltaTable 中,用作第二个流的输入。问题是,在当前笔记本运行中,第一个流中添加的内容在第二个流中不可用,因为它们同时启动。

有没有办法在从同一个笔记本运行时强制执行命令?

我尝试了awaitTermination 功能,但发现这并不能解决我的问题。一些伪代码:

def main():
    # Read eventhub
    metricbeat_df = spark \
        .readStream \
        .format("eventhubs") \
        .options(**eh_conf) \
        .load()

    # Save raw events
    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query1") \
        .table("my_db.raw_events")

    # Parse events
    metricbeat_df = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", True) \
        .table("my_db.raw_events")
  
    # *Do some transformations here*

    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query2") \
        .table("my_db.joined_bronze_events")

TLDR

总结问题:当我运行上面的代码时,query1query2 同时启动,这导致my_db.joined_bronze_eventsmy_db.raw_events 稍晚,因为在 query1 中添加的内容不可用在当前运行的 query2 中(当然会在下一次运行中)。

有没有办法强制 query2query1 完成之前不会启动,同时仍在同一个笔记本中运行它?

【问题讨论】:

    标签: apache-spark pyspark spark-streaming spark-structured-streaming


    【解决方案1】:

    当您使用选项Trigger.once 时,您可以在StreamingQuery 中使用processAllAvailable 方法:

    def main():
        # Read eventhub
        # note that I have changed the variable name to metricbeat_df1
        metricbeat_df1 = spark \
            .readStream \
            .format("eventhubs") \
            .options(**eh_conf) \
            .load()
    
        # Save raw events
        metricbeat_df1.writeStream \
            .trigger({"once": True}) \
            .format("delta") \
            .partitionBy("year", "month", "day") \
            .outputMode("append") \
            .option("checkpointLocation", "dbfs:/...") \
            .queryName("query1") \
            .table("my_db.raw_events") \
            .processAllAvailable()
    
        # Parse events
        # note that I have changed the variable name to metricbeat_df2
        metricbeat_df2 = spark.readStream \
            .format("delta") \
            .option("ignoreDeletes", True) \
            .table("my_db.raw_events")
      
        # *Do some transformations here*
    
        metricbeat_df2.writeStream \
            .trigger({"once": True}) \
            .format("delta") \
            .partitionBy("year", "month", "day") \
            .outputMode("append") \
            .option("checkpointLocation", "dbfs:/...") \
            .queryName("query2") \
            .table("my_db.joined_bronze_events") \
            .processAllAvailable()
    

    请注意,我已更改数据帧名称,因为它们对于两个流式查询不应该相同。

    方法processAllAvailable描述为:

    "阻塞,直到源中的所有可用数据都已处理并提交到接收器。此方法用于测试。请注意,在不断到达数据的情况下,此方法可能会永远阻塞。此外,此方法仅适用于保证阻塞,直到数据在调用之前被同步追加到 org.apache.spark.sql.execution.streaming.Source。(即 getOffset 必须立即反映添加)。"

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-04
      • 2018-09-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多