【发布时间】:2020-12-17 21:43:33
【问题描述】:
我需要使用复制命令将 s3 文件复制到 redshift。我对气流有点陌生并且遇到了问题。有人可以更正以下代码。我可以这样调用 rs.execute() 吗?
Error:
op.execute()
TypeError: execute() missing 1 required positional argument: 'context'
代码:
import os
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
default_args = {
'owner': 'gra',
'depends_on_past': False,
'start_date': datetime(2020, 12, 13),
'email': ['ss.com'],
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@daily',
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
def job1():
print('First Job to start')
def s3_redshift(**kwargs):
rs= S3ToRedshiftTransfer(redshift_conn_id ='12as',
aws_conn_id='gt_read',
schema='test',
table='dept',
s3_bucket="gng-test",
s3_key="copt.csv",
task_id="copy_redshift"
#copy_options=copy_options_,
)
rs.execute()
copy_redshift=PythonOperator(task_id='copy_redshift', python_callable=s3_redshift,provide_context=True, dag=dag)
app_start >> copy_redshift
【问题讨论】:
标签: airflow