【发布时间】:2021-07-21 15:51:09
【问题描述】:
我正在尝试模拟具有以下代码的函数write_tables_to_datalake_by_partition_datehour()。
tables_with_data[table].repartition("datehour_partition").write.format(
"parquet"
).partitionBy("datehour_partition").mode("overwrite").save(
cleansed_file_path + table
)
tables_with_data[table] 的类型为 pyspark.sql.DataFrame。在查看链式函数时,它保持为 DataFrame,直到 write 函数返回 DataFrameWriter。 DataFrameWriter 实际上可以使用其余的功能。
我不想将数据写入我们的数据湖。我想嘲笑它。
@mock.patch("module_name.io.DataFrame")
def test_write_tables_to_datalake_by_partition_datehour(mock_df) -> None:
mock_df.return_value.repartition.return_value.write.format.return_value.partitionBy.return_value.mode.return_value.save.return_value = (
"yes!"
)
这不起作用,因为DataFrame 没有方法save,这仅在DataFrameWriter 中可用。但我无法修补DataFrameWriter,因为我没有在我的模块中导入它,只有DataFrame 是。
我也试过
with mock.patch.object(DataFrameWriter, "save") as mock1:
mock1.return_value.format.partitionBy.mode.save.return_value = "test"
result = write_tables_to_datalake_by_partition_datehour(
tablenames,
dataframes_cleansed,
cleansed_file_path,
dataframes_cleansed,
quarantine_file_path,
)
这也行不通。如何以正确的方式模拟它?
【问题讨论】:
标签: python-3.x dataframe pyspark mocking pytest