【问题标题】:airflow S3ToRedshiftTransfer气流 S3ToRedshiftTransfer
【发布时间】:2020-12-17 21:43:33
【问题描述】:

我需要使用复制命令将 s3 文件复制到 redshift。我对气流有点陌生并且遇到了问题。有人可以更正以下代码。我可以这样调用 rs.execute() 吗?

Error:
    op.execute()
TypeError: execute() missing 1 required positional argument: 'context'

代码:

import os
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer

default_args = {
    'owner': 'gra',
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 13),
    'email': ['ss.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}



def job1():
    print('First Job to start')

def s3_redshift(**kwargs):
    rs= S3ToRedshiftTransfer(redshift_conn_id ='12as',
                            aws_conn_id='gt_read',
                            schema='test',
                            table='dept',
                            s3_bucket="gng-test",
                            s3_key="copt.csv",
                            task_id="copy_redshift"
                            #copy_options=copy_options_,
                            )
    rs.execute()

copy_redshift=PythonOperator(task_id='copy_redshift', python_callable=s3_redshift,provide_context=True, dag=dag)
app_start >> copy_redshift

【问题讨论】:

    标签: airflow


    【解决方案1】:

    我能够使用 boto3 执行从 s3 到 redshift 的复制。 S3ToRedshiftTransfer 也可以用来做同样的事情。

    # airflow related
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.bash_operator import BashOperator
    # other packages
    from datetime import datetime
    from datetime import timedelta
    # from airflow.hooks import PostgresHook
    from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
    #from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
    from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
    from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
    import boto3
    
    default_args = {
        'owner': 'grit_delta',
        'depends_on_past': False,
        'start_date': datetime(2020, 12, 13),
        'email': ['sa.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'schedule_interval': '@daily',
        'retries': 1,
        'retry_delay': timedelta(seconds=5),
    }
    
    
    dag=DAG(dag_id='veritas_test',default_args=default_args,schedule_interval=timedelta(1))
    
    
    def job1():
        print('First Job to start')
    
    file_sensor = S3KeySensor(task_id = 's3_key_sensor_task',
                    s3_conn_id='_read',
                    poke_interval=120,
                    timeout=18*60*60,
                    bucket_key = "data/test.*",
                    bucket_name = "g-test",
                    wildcard_match = True,
                    dag = dag
    )
    
    app_start=PythonOperator(task_id='app_start', python_callable=job1, dag=dag)
    
    
    def s3_redshift(**kwargs):
        rsd = boto3.client('redshift-data')
        deptKey='s3://airflow-dev/code/gta/dag/dept.csv'
        sqlQuery="copy test.dept  from 's3://airflow-grole' CSV ;"
        #sqlQuery="insert into test.dept values('d1221',100)"
        print(sqlQuery)
        resp = rsd.execute_statement(
            ClusterIdentifier="opes",
            Database="ee",
            DbUser="aa",
            Sql=sqlQuery
            #Sql="CREATE TABLE IF NOT EXISTS test.dept (title varchar(10), rating   int);"
                )
        print(resp)
        print(" completed")
        return "OK"
    
    
    copy_redshift=PythonOperator(task_id='copy_redshift', python_callable=s3_redshift,provide_context=True, dag=dag)
    file_sensor >>app_start >> copy_redshift
    

    【讨论】:

      【解决方案2】:

      您没有定义任何 DAG,也没有使用这样的运算符。我建议您阅读有关如何使用 Airflow 的更多信息。无论如何,代码应该是:

      import os
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.utils.dates import days_ago
      from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
      
      default_args = {
          'owner': 'gra',
          'depends_on_past': False,
          'start_date': datetime(2020, 12, 13),
          'email': ['ss.com'],
          'email_on_failure': False,
          'email_on_retry': False,
          'retries': 1,
          'retry_delay': timedelta(seconds=5),
      }
      
      with DAG('dag_name', schedule_interval='@daily', default_args=default_args) as dag:
          rs= S3ToRedshiftTransfer(redshift_conn_id ='12as',
                                  aws_conn_id='gt_read',
                                  schema='test',
                                  table='dept',
                                  s3_bucket="gng-test",
                                  s3_key="copt.csv",
                                  task_id="copy_redshift"
                                  )
      
      app_start >> rs
      

      【讨论】:

        猜你喜欢
        • 2021-04-01
        • 2020-12-06
        • 2021-05-04
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-10-13
        • 2023-03-15
        相关资源
        最近更新 更多