【问题标题】:Dynamically Creating DAG based on Row available on DB Connection基于数据库连接上可用的行动态创建 DAG
【发布时间】:2019-03-24 03:52:17
【问题描述】:

我想从数据库表查询中创建一个动态创建的 DAG。当我尝试从精确数字范围或基于气流设置中的可用对象创建动态创建 DAG 时,它成功了。但是,当我尝试使用 PostgresHook 并为表的每一行创建一个 DAG 时,每当我在表中添加新行时,我都会看到一个新的 DAG 生成。但是事实证明,我无法在气流网络服务器 ui 上单击新创建的 DAG。有关更多上下文,我正在使用 Google Cloud Composer。我已经按照DAGs not clickable on Google Cloud Composer webserver, but working fine on a local Airflow 中提到的步骤进行操作。但是它仍然不适用于我的情况。

这是我的代码

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["airflow@airflow.com"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

最好的,

【问题讨论】:

  • 请告诉我们您尝试了哪些代码,例如 postgres 钩子。
  • 您好,我已经发布了代码,谢谢。我认为它与stackoverflow.com/questions/51218314/… 有关但是问题仍然存在:(

标签: postgresql airflow airflow-scheduler


【解决方案1】:

这可以通过使用 [1] 中提到的步骤使用自我管理的 Airflow Webserver 来解决。执行此操作后,如果您决定在自管理网络服务器前添加身份验证,则创建入口后,您的 BackendServices 应该出现在 Google IAP 控制台上,您可以启用 IAP。如果您想以编程方式访问您的气流,您还可以使用服务帐户为您的自我管理的气流网络服务器 [2] 使用 JWT 身份验证。

[1]https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver

[2]https://cloud.google.com/iap/docs/authentication-howto

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-28
    • 1970-01-01
    • 2011-02-15
    • 2023-04-08
    • 2017-11-27
    相关资源
    最近更新 更多