【问题标题】:Airflow UI cannot manually trigger a dagAirflow UI 无法手动触发 dag
【发布时间】:2018-12-03 19:41:23
【问题描述】:

版本:

  • 气流 v1.10.1
  • SQL Server 后端
  • Web 服务器和调度程序运行在同一主机上
  • 本地执行器

使用 UI(按钮)手动触发时,不会运行简单的 hello world DAG。通过命令行运行时,相同的示例运行良好。希望允许用户使用 UI 来触发作业。这是一个错误吗。

Hello world DAG 测试示例:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_welcome():
    return 'Welcome!'

dag = DAG('say_welcome', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = DummyOperator(task_id='say_welcome_dummy_task', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='say_welcome_task', python_callable=print_welcome, dag=dag)

dummy_operator >> hello_operator

从命令行测试输出。

(airfow_v1_venv) sshuser@ed41-kp06sp:~/airflowv1/dags$ airflow trigger_dag say_welcome
[2018-12-03 19:38:34,679] {__init__.py:51} INFO - Using executor LocalExecutor
[2018-12-03 19:38:34,956] {models.py:271} INFO - Filling up the DagBag from /home/sshuser/airflowv1/dags
[2018-12-03 19:38:35,071] {cli.py:241} INFO - Created <DagRun say_welcome @ 2018-12-03 19:38:34+00:00: manual__2018-12-03T19:38:34+00:00, externally triggered: True>

使用 UI 触发时的日志

    context)
  File "/home/sshuser/airfow_v1_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
    cursor.executemany(statement, parameters)
IntegrityError: (pyodbc.IntegrityError) ('23000', u"[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__task_ins__9BEABD04E2A8D429'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (say_welcome_task, say_welcome, Dec  3 2018  7:40PM). (2627) (SQLExecDirectW)") [SQL: u'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: (('say_welcome_task', 'say_welcome', datetime.datetime(2018, 12, 3, 19, 40, 9, 787000, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, u'', 'sshuser', None, None, 'default', 1, None, None, None, bytearray(b'\x80\x02}q\x00.')), ('say_welcome_dummy_task', 'say_welcome', datetime.datetime(2018, 12, 3, 19, 40, 9, 787000, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 3, u'', 'sshuser', None, None, 'default', 2, None, None, None, bytearray(b'\x80\x02}q\x00.')))]

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    在我看来,您试图覆盖旧的 say_welcome DAG。

    创建一个名为 say_welcome_v1 并试一试。

    创建新的 DAG 时,您必须更改它的名称,以便在元数据库中区分它。因此,每次 DAG 发生变化时,在 DAG 名称末尾使用 _v1_v2 等的约定。

    由于您遇到的错误是完整性错误,当您尝试使用与已经存在的其他内容相同的主键将某些内容插入数据库时​​,就会发生这种错误。很可能是来自与旧 DAG 同名的新 D​​AG 的错误。

    如果您没有任何值得保留历史/日志的旧 DAG 运行,您可以使用 airflow resetdb 后跟 airflow initdb 来重置您的数据库并从头开始。

    您还可以使用 airflow delete_dag my_dag_id 从 Airflow 版本 1.10 及更高版本中删除元数据库中的旧 DAG ID。

    【讨论】:

    • 已经做了 resetdb 选项来清理数据库。当它通过命令行运行时不会发生这种情况。同样从详细日志中,我看到气流正在尝试将记录两次插入数据库。不知道为什么会这样。
    • 您是否尝试将 DAG 名称更改为 say_welcome_v1
    • 是的,已重命名 dags。同样的错误。即使创建了新的 dag,也有同样的问题。在命令行中工作,使用 UI 触发有相同的错误。
    • 您能找到解决方案吗?
    • 是的,还没有找到解决方案。考虑为 mysql 或 postgres 转储 MS-SQL Server 后端,因为 mysql 似乎工作正常。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-06-24
    • 2021-12-08
    • 1970-01-01
    • 1970-01-01
    • 2018-01-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多