【问题标题】:How do I test this function?如何测试此功能?
【发布时间】:2021-12-16 08:54:52
【问题描述】:

我有这个功能:

# spark already defined somewhere as:
spark = SparkSession.builder.appName("App").getOrCreate()

def read_data(spark):
    query = "SELECT * FROM table"
    pandas_df = pd.read_sql(query, conn)
    return spark.createDataFrame(pandas_df)

测试它:

from unittest import mock

@mock.patch("pandas.read_sql")
@mock.patch("pyspark.sql.SparkSession", autospec=True)
def test_read_data(spark_session, pandas_read_sql):
    result = read_data(spark_session)
    assert == ???

我应该以什么方式测试这是否有意义?任何帮助表示赞赏。

【问题讨论】:

    标签: python pandas pyspark python-unittest


    【解决方案1】:

    为了测试你的函数,你只需要模拟pandas.read_sqlspark_session 不能被模拟,你需要一个实例来正确地测试你的函数。您可以创建自己的pytest.fixture 来满足此要求。

    from unittest.mock import patch
    
    import pandas
    import pyspark.sql
    import pytest
    from pyspark.sql import SparkSession
    
    from your_module import read_data
    
    
    @pytest.fixture
    def spark_session():
        _spark_session = SparkSession.builder.appName("unit-tests").getOrCreate()
        yield _spark_session
        _spark_session.stop()
    
    
    @patch("pandas.read_sql")
    def test_read_data(mock_read_sql, spark_session):
        # given:
        mock_read_sql.return_value = pandas.DataFrame(
            [(1, "row1"), (2, "row2")], columns=["id", "column1"]
        )
    
        # when:
        spark_df = read_data(spark_session)
    
        # then:
        assert isinstance(spark_df, pyspark.sql.DataFrame)
    

    您可以做更多的断言并检查创建的数据框是否具有正确的架构并包含您期望的值。

    提示:您应该查看 spark sql 功能,因为您可能不需要使用 pandas 来查询您的数据库。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多