【发布时间】:2020-03-02 19:08:33
【问题描述】:
我有以下几点:
def fetch_sf_data():
response1 = requests.get("https://company.my.salesforce.com/"+ReportID1+"?export=1&enc=UTF-8&xf=csv",
headers = sf.headers, cookies = {'sid' : sid})
global salesforce_report
salesforce_report_raw = pd.read_csv(io.StringIO(response1.text))
salesforce_report = salesforce_report_raw[:-5]
def push_to_sql(salesforce_report):
salesforce_report.to_sql('Daily_Report_SF',engine,if_exists='replace' ,index=False)
t1 = PythonOperator(
task_id='fetch_sf_data',
python_callable=fetch_sf_data,
dag = dag
)
t2 = PythonOperator(
task_id='push_to_sql',
python_callable=push_to_sql,
dag=dag
)
t1 >> t2
任务 1 运行正常,但任务 2 失败并返回以下错误代码:
TypeError: push_to_sql() missing 1 required positional argument: 'salesforce_report'
我的印象是,因为我将“salesforce_report”声明为全局变量,所以它会毫无问题地传递到下一个任务。现在我使用气流不是这种情况吗?我应该怎么做才能使任务 2 不失败?
感谢您的帮助!
【问题讨论】:
标签: python-3.x airflow directed-acyclic-graphs