【问题标题】:Save Spark Dataframe Before Writing to Snowflake在写入雪花之前保存 Spark 数据框
【发布时间】:2020-07-28 14:54:09
【问题描述】:

我在 PySpark 中工作,在获得要写入 Snowflake 的最终输出表之前,我进行了一系列转换并应用了用户定义的函数。写入 Snowflake 的最终命令需要大约 25 分钟才能运行,因为它还在执行所有计算,因为 Spark 会延迟评估并且直到最后一次调用才会评估。 我想在之前的步骤中评估最终表,这样我就可以计算所有转换需要多长时间,然后分别计算写入 Snowflake 步骤需要多长时间。我如何将两者分开?我已经尝试过:

temp = final_df.show() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

但我得到错误:

'NoneType' object has no attribute 'write'

然后用 collect()

temp = final_df.collect() 

temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
.option("dbtable","TEST_SPARK").save()

但我得到错误:

'list' object has no attribute 'write'

【问题讨论】:

    标签: apache-spark pyspark lazy-evaluation


    【解决方案1】:

    您的 temp 数据框的结果为 .show(),导致临时变量没有类型,只有 dataframe 对源有 .write 方法。

    Try with below code:

    temp = final_df
    #view records from temp dataframe
    temp.show()
    
    temp.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    

    #collect collects the data as list and stores into temp variable
    temp = final_df.collect() 
    
    #list attributes doesn't have .write method
    final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    

    Update:

    import time
    start_time = time.time()
    #code until show()
    temp = final_df
    #view records from temp dataframe
    temp.show()
    end_time = time.time()
    print("Total execution time for action: {} seconds".format(end_time - start_time))
    
    start_time_sfw = time.time()
    #code until show()
    final_df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions2) \
    .option("dbtable","TEST_SPARK").save()
    end_time_sfw = time.time()
    print("Total execution time for writing to snowflake: {} seconds".format(end_time_sfw - start_time_sfw))
    

    【讨论】:

    • 好点。但是第一个仍然会导致 .show() 步骤需要大约 25 分钟,然后再次写入步骤需要大约 25 分钟,因为它再次执行所有评估。我想在写雪花之前强制评估
    • @user3242036,是的,这是来自 spark .show() 操作的预期行为触发所有要执行的转换,调整你的工作做 temp=final_df.cache() 然后做 temp.show() 现在你的工作不会占用 1时间约 25 分钟。计算使用 time 包所花费的时间。检查我的更新答案!
    • 谢谢。我使用了 cache() 选项,它似乎有效
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-22
    • 1970-01-01
    • 2022-07-20
    • 1970-01-01
    • 2015-05-09
    • 1970-01-01
    相关资源
    最近更新 更多