【发布时间】:2020-11-17 21:16:34
【问题描述】:
我正在尝试使用 airflow.hooks.S3Hook 创建一个简单的 DAG 编排 2 个任务,第一个在 bash 上打印一个简单的字符串,下一个是将 CSV 文件上传到 AWS s3 存储桶。 所以我得到这个错误:
unable to locate credentials
这是与 aws_access_key_id 和 aws_secret_access_key 相关的凭据错误。 我知道我可以使用 boto3 解决它,但我需要使用 airflow.hooks 来解决它
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_arguments = {'owner': 'airflow', 'start_date': days_ago(1)}
def upload_file_to_s3_bucket(filename, key, bucket_name, region_name):
hook = S3Hook(aws_conn_id='aws_default')
hook.create_bucket(bucket_name, region_name)
hook.load_file(filename, key, bucket_name)
with DAG('upload_to_aws',
schedule_interval='@daily',
catchup=False,
default_args=default_arguments
) as dag:
bash_task = BashOperator(task_id='bash_task',
bash_command='echo $TODAY',
env={'TODAY': '2020-11-16'})
python_task = PythonOperator(task_id='py_task',
python_callable=upload_file_to_s3_bucket,
op_kwargs={'filename': '*******.csv',
'key': 'my_s3_reasult.csv',
'bucket_name': 'tutobucket',
'region_name': 'us-east-1'}
)
bash_task >> python_task
【问题讨论】:
标签: python python-3.x boto3 airflow