【问题标题】:Airflow Dag Task Failing Global Variable Not RecognizedAirflow Dag 任务失败全局变量无法识别
【发布时间】:2020-03-02 19:08:33
【问题描述】:

我有以下几点:

def fetch_sf_data():
    response1 = requests.get("https://company.my.salesforce.com/"+ReportID1+"?export=1&enc=UTF-8&xf=csv",
                  headers = sf.headers, cookies = {'sid' : sid})
    global salesforce_report
    salesforce_report_raw = pd.read_csv(io.StringIO(response1.text))
    salesforce_report = salesforce_report_raw[:-5]

def push_to_sql(salesforce_report):
    salesforce_report.to_sql('Daily_Report_SF',engine,if_exists='replace' ,index=False)

t1 = PythonOperator(
    task_id='fetch_sf_data',
    python_callable=fetch_sf_data,
    dag = dag 
)

t2 = PythonOperator(
    task_id='push_to_sql',
    python_callable=push_to_sql,
    dag=dag
)

t1 >> t2

任务 1 运行正常,但任务 2 失败并返回以下错误代码:

TypeError: push_to_sql() missing 1 required positional argument: 'salesforce_report'

我的印象是,因为我将“salesforce_report”声明为全局变量,所以它会毫无问题地传递到下一个任务。现在我使用气流不是这种情况吗?我应该怎么做才能使任务 2 不失败?

感谢您的帮助!

【问题讨论】:

    标签: python-3.x airflow directed-acyclic-graphs


    【解决方案1】:

    这行不通。这是因为在 Airflow 中,每个任务都可以在不同的机器上运行,因此您应该将结果保存在 Xcom 或 S3/GCS 或本地文件系统中。然后在下一个任务中读取此文件并推送到 SQL。

    【讨论】:

    • 感谢您的回复!不幸的是,当我这样做时,我得到Broken DAG: name 'salesforce_report' is not defined
    • 更新了答案
    猜你喜欢
    • 1970-01-01
    • 2018-09-08
    • 2020-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-27
    相关资源
    最近更新 更多