【发布时间】:2021-02-17 17:19:55
【问题描述】:
我正在使用 pytest 测试需要任务上下文(因此需要 DAG)的 Airflow 任务。 有两个文件:
- conftest.py:包含 pytext 固定装置的文件
- test_bash.py:包含任务测试的文件
这是 DAG 的夹具:
@pytest.fixture
def test_dag():
args = {'owner': 'airflow', 'start_date': datetime.datetime(2021, 1, 1)}
return DAG('test_dag', default_args=args, schedule_interval='@daily')
这是我用于 Airflow 元存储的 postgres_container 的固定装置:
postgres_image = fetch(repository='postgres:13')
postgres_container = container(
image='{postgres_image.id}',
scope='function',
ports={'5432/tcp': None},
environment={
'POSTGRES_USER': 'airflow',
'POSTGRES_PASSWORD': 'airflow',
}
}
这是实际测试:
import os
import ipaddress
import subprocess
from airflow.operators.bash import BashOperator
def test_bash_so(test_dag, postgres_container):
print(
f"Running PostgreSQL container named {postgres_container.name} "
f"on port {postgres_container.ports['5432/tcp'][0]}."
)
postgres_ip = ipaddress.ip_address(postgres_container.ips.primary)
# set sql_alchemy_conn to the URI of the postgres_container
env = os.environ.copy()
env['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = (
f'postgresql+psycopg2://airflow:airflow@{postgres_ip}:5432/airflow')
# initialize the Airflow metastore
p = subprocess.run(['airflow', 'db', 'init'], capture_output=True, env=env)
p.check_returncode()
print(p.stdout)
# check that the Airflow metastore was initialized successfuly
p = subprocess.run(['airflow', 'db', 'check'], capture_output=True, env=env)
p.check_returncode()
print(p.stdout)
my_file = '/tmp/hello.txt'
task = BashOperator(
task_id='test_bash',
bash_command=f'echo hello > {my_file}',
dag=test_dag
)
task.run(
start_date=test_dag.default_args['start_date'],
end_date=test_dag.default_args['start_date']
)
with open(my_file, 'r') as f:
assert f.read().replace('\n', '') == 'hello'
我认为该任务从未真正运行,因为没有创建文件。
这是我运行pytest -rP时的输出:
=============================================================================================================== FAILURES ===============================================================================================================
_____________________________________________________________________________________________________________ test_bash_so _____________________________________________________________________________________________________________
test_dag = <DAG: test_dag>, postgres_container = <pytest_docker_tools.wrappers.container.Container object at 0x7f085c8a1160>
def test_bash_so(test_dag, postgres_container):
print(
f"Running PostgreSQL container named {postgres_container.name} "
f"on port {postgres_container.ports['5432/tcp'][0]}."
)
postgres_ip = ipaddress.ip_address(postgres_container.ips.primary)
env = os.environ.copy()
env['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = (
f'postgresql+psycopg2://airflow:airflow@{postgres_ip}:5432/airflow')
p = subprocess.run(['airflow', 'db', 'init'], capture_output=True, env=env)
p.check_returncode()
print(p.stdout)
p = subprocess.run(['airflow', 'db', 'check'], capture_output=True, env=env)
p.check_returncode()
print(p.stdout)
my_file = '/tmp/hello.txt'
task = BashOperator(
task_id='test_bash',
bash_command=f'echo hello > {my_file}',
dag=test_dag
)
task.run(
start_date=test_dag.default_args['start_date'],
end_date=test_dag.default_args['start_date']
)
> with open(my_file, 'r') as f:
E FileNotFoundError: [Errno 2] No such file or directory: '/tmp/hello.txt'
tests/plugins/custom_operators/test_bash_so.py:40: FileNotFoundError
-------------------------------------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------------------------------------
Fetching postgres:13
Waiting for container to be ready..
--------------------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------------------
Running PostgreSQL container named vigilant_austin on port 49237.
b"DB: postgresql+psycopg2://airflow:***@172.17.0.2:5432/airflow\n[2021-02-17 18:07:21,798] {db.py:678} INFO - Creating tables\n[2021-02-17 18:07:23,478] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.\n[2021-02-17 18:07:25,526] {migration.py:555} INFO - Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection\n[2021-02-17 18:07:25,528] {migration.py:555} INFO - Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text\n[2021-02-17 18:07:25,529] {migration.py:555} INFO - Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql\n[2021-02-17 18:07:25,660] {dagbag.py:440} INFO - Filling up the DagBag from /home/denis/airflow/dags\n[2021-02-17 18:07:25,692] {example_kubernetes_executor_config.py:174} WARNING - Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'\n[2021-02-17 18:07:25,692] {example_kubernetes_executor_config.py:175} WARNING - Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']\n[2021-02-17 18:07:25,713] {dag.py:1813} INFO - Sync 28 DAGs\n[2021-02-17 18:07:25,717] {dag.py:1832} INFO - Creating ORM DAG for example_xcom_args_with_operators\n[2021-02-17 18:07:25,717] {dag.py:1832} INFO - Creating ORM DAG for tutorial\n[2021-02-17 18:07:25,718] {dag.py:1832} INFO - Creating ORM DAG for latest_only\n[2021-02-17 18:07:25,718] {dag.py:1832} INFO - Creating ORM DAG for example_trigger_target_dag\n[2021-02-17 18:07:25,718] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator.section-1\n[2021-02-17 18:07:25,718] {dag.py:1832} INFO - Creating ORM DAG for example_trigger_controller_dag\n[2021-02-17 18:07:25,718] {dag.py:1832} INFO - Creating ORM DAG for example_passing_params_via_test_command\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for example_xcom\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for latest_only_with_trigger\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for example_kubernetes_executor\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator.section-2\n[2021-02-17 18:07:25,719] {dag.py:1832} INFO - Creating ORM DAG for tutorial_etl_dag\n[2021-02-17 18:07:25,720] {dag.py:1832} INFO - Creating ORM DAG for example_short_circuit_operator\n[2021-02-17 18:07:25,720] {dag.py:1832} INFO - Creating ORM DAG for example_python_operator\n[2021-02-17 18:07:25,720] {dag.py:1832} INFO - Creating ORM DAG for example_complex\n[2021-02-17 18:07:25,720] {dag.py:1832} INFO - Creating ORM DAG for example_nested_branch_dag\n[2021-02-17 18:07:25,720] {dag.py:1832} INFO - Creating ORM DAG for example_dag_decorator\n[2021-02-17 18:07:25,721] {dag.py:1832} INFO - Creating ORM DAG for example_external_task_marker_child\n[2021-02-17 18:07:25,721] {dag.py:1832} INFO - Creating ORM DAG for example_external_task_marker_parent\n[2021-02-17 18:07:25,721] {dag.py:1832} INFO - Creating ORM DAG for example_task_group\n[2021-02-17 18:07:25,721] {dag.py:1832} INFO - Creating ORM DAG for example_branch_dop_operator_v3\n[2021-02-17 18:07:25,721] {dag.py:1832} INFO - Creating ORM DAG for example_branch_operator\n[2021-02-17 18:07:25,722] {dag.py:1832} INFO - Creating ORM DAG for example_xcom_args\n[2021-02-17 18:07:25,722] {dag.py:1832} INFO - Creating ORM DAG for example_bash_operator\n[2021-02-17 18:07:25,722] {dag.py:1832} INFO - Creating ORM DAG for example_skip_dag\n[2021-02-17 18:07:25,722] {dag.py:1832} INFO - Creating ORM DAG for tutorial_taskflow_api_etl\n[2021-02-17 18:07:25,722] {dag.py:1832} INFO - Creating ORM DAG for test_utils\n[2021-02-17 18:07:25,730] {dag.py:2266} INFO - Setting next_dagrun for example_bash_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,741] {dag.py:2266} INFO - Setting next_dagrun for example_branch_dop_operator_v3 to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,746] {dag.py:2266} INFO - Setting next_dagrun for example_branch_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,746] {dag.py:2266} INFO - Setting next_dagrun for example_complex to None\n[2021-02-17 18:07:25,747] {dag.py:2266} INFO - Setting next_dagrun for example_dag_decorator to None\n[2021-02-17 18:07:25,747] {dag.py:2266} INFO - Setting next_dagrun for example_external_task_marker_child to None\n[2021-02-17 18:07:25,747] {dag.py:2266} INFO - Setting next_dagrun for example_external_task_marker_parent to None\n[2021-02-17 18:07:25,747] {dag.py:2266} INFO - Setting next_dagrun for example_kubernetes_executor to None\n[2021-02-17 18:07:25,752] {dag.py:2266} INFO - Setting next_dagrun for example_nested_branch_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,763] {dag.py:2266} INFO - Setting next_dagrun for example_passing_params_via_test_command to 2021-02-16 00:00:00+00:00\n[2021-02-17 18:07:25,763] {dag.py:2266} INFO - Setting next_dagrun for example_python_operator to None\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_short_circuit_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_skip_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-1 to None\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-2 to None\n[2021-02-17 18:07:25,764] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] {dag.py:2266} INFO - Setting next_dagrun for example_trigger_controller_dag to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] {dag.py:2266} INFO - Setting next_dagrun for example_trigger_target_dag to None\n[2021-02-17 18:07:25,765] {dag.py:2266} INFO - Setting next_dagrun for example_xcom to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,765] {dag.py:2266} INFO - Setting next_dagrun for example_xcom_args to None\n[2021-02-17 18:07:25,765] {dag.py:2266} INFO - Setting next_dagrun for example_xcom_args_with_operators to None\n[2021-02-17 18:07:25,766] {dag.py:2266} INFO - Setting next_dagrun for latest_only to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,766] {dag.py:2266} INFO - Setting next_dagrun for latest_only_with_trigger to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,766] {dag.py:2266} INFO - Setting next_dagrun for test_utils to None\n[2021-02-17 18:07:25,766] {dag.py:2266} INFO - Setting next_dagrun for tutorial to 2021-02-15 00:00:00+00:00\n[2021-02-17 18:07:25,767] {dag.py:2266} INFO - Setting next_dagrun for tutorial_etl_dag to None\n[2021-02-17 18:07:25,767] {dag.py:2266} INFO - Setting next_dagrun for tutorial_taskflow_api_etl to None\n[2021-02-17 18:07:25,792] {dag.py:1813} INFO - Sync 2 DAGs\n[2021-02-17 18:07:25,799] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-1 to None\n[2021-02-17 18:07:25,800] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-2 to None\nInitialization done\n"
b'[2021-02-17 18:07:26,759] {db.py:756} INFO - Connection successful.\n'
------------------------------------------------------------------------------------------------- postgres_container: vigilant_austin --------------------------------------------------------------------------------------------------
The files belonging to this database system will be owned by user "postgres".
This user must also own the server process.
The database cluster will be initialized with locale "en_US.utf8".
The default database encoding has accordingly been set to "UTF8".
The default text search configuration will be set to "english".
Data page checksums are disabled.
fixing permissions on existing directory /var/lib/postgresql/data ... ok
creating subdirectories ... ok
selecting dynamic shared memory implementation ... posix
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting default time zone ... Etc/UTC
creating configuration files ... ok
running bootstrap script ... ok
performing post-bootstrap initialization ... ok
syncing data to disk ... ok
Success. You can now start the database server using:
pg_ctl -D /var/lib/postgresql/data -l logfile start
initdb: warning: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.
waiting for server to start....2021-02-17 17:07:20.664 UTC [53] LOG: starting PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2021-02-17 17:07:20.665 UTC [53] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-02-17 17:07:20.667 UTC [54] LOG: database system was shut down at 2021-02-17 17:07:20 UTC
2021-02-17 17:07:20.671 UTC [53] LOG: database system is ready to accept connections
done
server started
CREATE DATABASE
/usr/local/bin/docker-entrypoint.sh: ignoring /docker-entrypoint-initdb.d/*
2021-02-17 17:07:20.898 UTC [53] LOG: received fast shutdown request
waiting for server to shut down....2021-02-17 17:07:20.899 UTC [53] LOG: aborting any active transactions
2021-02-17 17:07:20.900 UTC [53] LOG: background worker "logical replication launcher" (PID 60) exited with exit code 1
2021-02-17 17:07:20.900 UTC [55] LOG: shutting down
2021-02-17 17:07:20.908 UTC [53] LOG: database system is shut down
done
server stopped
PostgreSQL init process complete; ready for start up.
2021-02-17 17:07:21.019 UTC [1] LOG: starting PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2021-02-17 17:07:21.019 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
2021-02-17 17:07:21.019 UTC [1] LOG: listening on IPv6 address "::", port 5432
2021-02-17 17:07:21.020 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2021-02-17 17:07:21.024 UTC [81] LOG: database system was shut down at 2021-02-17 17:07:20 UTC
2021-02-17 17:07:21.027 UTC [1] LOG: database system is ready to accept connections
2021-02-17 17:07:21.829 UTC [94] ERROR: relation "connection" does not exist at character 569
2021-02-17 17:07:21.829 UTC [94] STATEMENT: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted, count(connection.conn_id) AS count_1
FROM connection GROUP BY connection.conn_id
HAVING count(connection.conn_id) > 1
2021-02-17 17:07:21.830 UTC [94] ERROR: current transaction is aborted, commands ignored until end of transaction block
2021-02-17 17:07:21.830 UTC [94] STATEMENT: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
FROM connection
WHERE connection.conn_type IS NULL
================================================================================================================ PASSES ================================================================================================================
=============================================================================================== 1 failed, 2 passed, 3 skipped in 11.83s ================================================================================================
有趣的是,如果我注释掉实际的 task.run() 调用和其余部分,Airflow 元存储似乎已正确初始化:
______________________________________________________________________________________________________________ test_bash_so _______________________________________________________________________________________________________________
-------------------------------------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------------------------------------
Fetching postgres:13
Waiting for container to be ready..
--------------------------------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------------------------------
Running PostgreSQL container named tender_rubin on port 49234.
DB: postgresql+psycopg2://airflow:***@172.17.0.2:5432/airflow
[2021-02-17 17:52:31,680] {db.py:678} INFO - Creating tables
[2021-02-17 17:52:33,363] {manager.py:727} WARNING - No user yet created, use flask fab command to do it.
[2021-02-17 17:52:35,409] {migration.py:555} INFO - Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection
[2021-02-17 17:52:35,411] {migration.py:555} INFO - Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text
[2021-02-17 17:52:35,413] {migration.py:555} INFO - Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql
[2021-02-17 17:52:35,547] {dagbag.py:440} INFO - Filling up the DagBag from /home/denis/airflow/dags
[2021-02-17 17:52:35,570] {example_kubernetes_executor_config.py:174} WARNING - Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
[2021-02-17 17:52:35,570] {example_kubernetes_executor_config.py:175} WARNING - Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
[2021-02-17 17:52:35,592] {dag.py:1813} INFO - Sync 28 DAGs
[2021-02-17 17:52:35,596] {dag.py:1832} INFO - Creating ORM DAG for example_branch_operator
[2021-02-17 17:52:35,596] {dag.py:1832} INFO - Creating ORM DAG for example_external_task_marker_child
[2021-02-17 17:52:35,596] {dag.py:1832} INFO - Creating ORM DAG for example_trigger_controller_dag
[2021-02-17 17:52:35,597] {dag.py:1832} INFO - Creating ORM DAG for example_short_circuit_operator
[2021-02-17 17:52:35,597] {dag.py:1832} INFO - Creating ORM DAG for example_branch_dop_operator_v3
[2021-02-17 17:52:35,597] {dag.py:1832} INFO - Creating ORM DAG for example_passing_params_via_test_command
[2021-02-17 17:52:35,597] {dag.py:1832} INFO - Creating ORM DAG for latest_only_with_trigger
[2021-02-17 17:52:35,597] {dag.py:1832} INFO - Creating ORM DAG for tutorial_taskflow_api_etl
[2021-02-17 17:52:35,598] {dag.py:1832} INFO - Creating ORM DAG for example_external_task_marker_parent
[2021-02-17 17:52:35,598] {dag.py:1832} INFO - Creating ORM DAG for example_bash_operator
[2021-02-17 17:52:35,598] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator
[2021-02-17 17:52:35,598] {dag.py:1832} INFO - Creating ORM DAG for example_python_operator
[2021-02-17 17:52:35,598] {dag.py:1832} INFO - Creating ORM DAG for example_dag_decorator
[2021-02-17 17:52:35,599] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator.section-2
[2021-02-17 17:52:35,599] {dag.py:1832} INFO - Creating ORM DAG for tutorial
[2021-02-17 17:52:35,599] {dag.py:1832} INFO - Creating ORM DAG for test_utils
[2021-02-17 17:52:35,599] {dag.py:1832} INFO - Creating ORM DAG for example_kubernetes_executor
[2021-02-17 17:52:35,599] {dag.py:1832} INFO - Creating ORM DAG for latest_only
[2021-02-17 17:52:35,600] {dag.py:1832} INFO - Creating ORM DAG for example_xcom
[2021-02-17 17:52:35,600] {dag.py:1832} INFO - Creating ORM DAG for example_nested_branch_dag
[2021-02-17 17:52:35,600] {dag.py:1832} INFO - Creating ORM DAG for example_trigger_target_dag
[2021-02-17 17:52:35,600] {dag.py:1832} INFO - Creating ORM DAG for example_xcom_args_with_operators
[2021-02-17 17:52:35,600] {dag.py:1832} INFO - Creating ORM DAG for example_xcom_args
[2021-02-17 17:52:35,601] {dag.py:1832} INFO - Creating ORM DAG for example_task_group
[2021-02-17 17:52:35,601] {dag.py:1832} INFO - Creating ORM DAG for example_subdag_operator.section-1
[2021-02-17 17:52:35,601] {dag.py:1832} INFO - Creating ORM DAG for tutorial_etl_dag
[2021-02-17 17:52:35,601] {dag.py:1832} INFO - Creating ORM DAG for example_complex
[2021-02-17 17:52:35,601] {dag.py:1832} INFO - Creating ORM DAG for example_skip_dag
[2021-02-17 17:52:35,609] {dag.py:2266} INFO - Setting next_dagrun for example_bash_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,621] {dag.py:2266} INFO - Setting next_dagrun for example_branch_dop_operator_v3 to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,625] {dag.py:2266} INFO - Setting next_dagrun for example_branch_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,625] {dag.py:2266} INFO - Setting next_dagrun for example_complex to None
[2021-02-17 17:52:35,626] {dag.py:2266} INFO - Setting next_dagrun for example_dag_decorator to None
[2021-02-17 17:52:35,626] {dag.py:2266} INFO - Setting next_dagrun for example_external_task_marker_child to None
[2021-02-17 17:52:35,626] {dag.py:2266} INFO - Setting next_dagrun for example_external_task_marker_parent to None
[2021-02-17 17:52:35,626] {dag.py:2266} INFO - Setting next_dagrun for example_kubernetes_executor to None
[2021-02-17 17:52:35,631] {dag.py:2266} INFO - Setting next_dagrun for example_nested_branch_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,642] {dag.py:2266} INFO - Setting next_dagrun for example_passing_params_via_test_command to 2021-02-16 00:00:00+00:00
[2021-02-17 17:52:35,642] {dag.py:2266} INFO - Setting next_dagrun for example_python_operator to None
[2021-02-17 17:52:35,643] {dag.py:2266} INFO - Setting next_dagrun for example_short_circuit_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] {dag.py:2266} INFO - Setting next_dagrun for example_skip_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,643] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-1 to None
[2021-02-17 17:52:35,643] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-2 to None
[2021-02-17 17:52:35,644] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] {dag.py:2266} INFO - Setting next_dagrun for example_trigger_controller_dag to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] {dag.py:2266} INFO - Setting next_dagrun for example_trigger_target_dag to None
[2021-02-17 17:52:35,644] {dag.py:2266} INFO - Setting next_dagrun for example_xcom to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,644] {dag.py:2266} INFO - Setting next_dagrun for example_xcom_args to None
[2021-02-17 17:52:35,645] {dag.py:2266} INFO - Setting next_dagrun for example_xcom_args_with_operators to None
[2021-02-17 17:52:35,645] {dag.py:2266} INFO - Setting next_dagrun for latest_only to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,645] {dag.py:2266} INFO - Setting next_dagrun for latest_only_with_trigger to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,645] {dag.py:2266} INFO - Setting next_dagrun for test_utils to None
[2021-02-17 17:52:35,645] {dag.py:2266} INFO - Setting next_dagrun for tutorial to 2021-02-15 00:00:00+00:00
[2021-02-17 17:52:35,646] {dag.py:2266} INFO - Setting next_dagrun for tutorial_etl_dag to None
[2021-02-17 17:52:35,646] {dag.py:2266} INFO - Setting next_dagrun for tutorial_taskflow_api_etl to None
[2021-02-17 17:52:35,671] {dag.py:1813} INFO - Sync 2 DAGs
[2021-02-17 17:52:35,677] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-1 to None
[2021-02-17 17:52:35,677] {dag.py:2266} INFO - Setting next_dagrun for example_subdag_operator.section-2 to None
Initialization done
[2021-02-17 17:52:36,652] {db.py:756} INFO - Connection successful.
所以我想知道:subprocess.run() 调用之后的 task.run() 代码是否会以某种方式影响 Metastore 初始化?也许任务在元存储初始化之前运行?或者可能由于其他原因该任务根本没有运行?
【问题讨论】: