【发布时间】:2021-10-01 15:26:45
【问题描述】:
我正在尝试为气流中的 DAG 编写一些完整性测试。我目前正在使用airflow.utils.dag_cycle_tester 中的test_cycle 函数测试循环DAG。我的代码如下:
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
from airflow.utils.dag_cycle_tester import test_cycle
DAG_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
print(DAG_FILES)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name,
module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var,
DAG)]
assert dag_objects
for dag in dag_objects:
test_cycle(dag)
由于某种原因,当我使用 pytest tests/ 运行它时,我收到以下错误:
___________在设置 test_cycle 时出错 ___________
def test_cycle(dag):
找不到 E 夹具“dag”
可用的夹具:anyio_backend、anyio_backend_name、anyio_backend_options、缓存、capfd、capfdbinary、caplog、capsys、capsysbinary、celery_config、celery_enable_logging、cov、doctest_namespace、monkeypatch、no_cover、pytestconfig、record_property、record_testsuite_property、record_xml_attribute、recwarn、ytmp_path、 tmpdir, tmpdir_factory 使用 'pytest --fixtures [testpath]' 获取帮助。
任何关于导致这种情况的想法都将不胜感激。谢谢!
【问题讨论】:
-
错误发生在另一个功能上,而不是您显示的功能上,显示该功能