【问题标题】:cloud composer-airflow throws error : Broken DAG: cannot import name '_parse_data' when importing new dagcloud composer-airflow 抛出错误:Broken DAG: cannot import name '_parse_data' when importing new dag
【发布时间】:2019-09-19 09:39:48
【问题描述】:

我正在尝试在 Cloud Composer 中创建 DAG。导入时出现以下错误:

损坏的 DAG:[/home/airflow/gcs/dags/airflow_bigquery_v12.py] 无法导入名称 _parse_data

这是 DAG 文件。如您所见,它会尝试将云存储文件复制到 bigquery 中:

import datetime
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

DEFAULT_ARGS = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': YESTERDAY,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': models.Variable.get('gcp_project')
}


with DAG('airflow_bigquery_v12',
         default_args=DEFAULT_ARGS,
         schedule_interval=timedelta(days=1),
         catchup=False
         ) as dag:


    start_task = DummyOperator(task_id="start", dag=dag)
    end_task = DummyOperator(task_id="end", dag=dag)



    gcs_to_bigquery_rides = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_to_BigQuery_stage',
        bucket='my_bucket',
        destination_project_dataset_table='misc.pg_rides_json_airflow',
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=['rides_new.json'],
        #ignore_unknown_values = True,
        #schema_fields=dc(),
        schema_object= 'rides_schema.json',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        #skip_leading_rows = 1,
        google_cloud_storage_conn_id='google_cloud_storage_default',
        bigquery_conn_id='bigquery_default'
        )

start_task >> gcs_to_bigquery_rides >> end_task

作为参考,这是在“my_bucket”中找到的rides_new.json 文件,其中包含要创建的表的架构

[
  {
    "mode": "NULLABLE",
    "name": "finish_picture_state",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_picture_file_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_reason",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "starting_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "finished_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "created_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "ending_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "state",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "currency",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "amount",
        "type": "INTEGER"
      }
    ],
    "mode": "NULLABLE",
    "name": "cost",
    "type": "RECORD"
  },
  {
    "mode": "NULLABLE",
    "name": "stoped_since",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "minutes",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "vehicle_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "distance",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "service_area_id",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "base",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "per_minute",
      }
    ],
    "mode": "NULLABLE",
    "name": "pricing",
    "type": "RECORD"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "m",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "latitude",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "longitude",
        "type": "FLOAT"
      }
    ],
    "mode": "REPEATED",
    "name": "path",
    "type": "RECORD"
  }
]

非常感谢您的帮助。谢谢

【问题讨论】:

  • 欢迎来到 StackOverflow。查看 Airflow 的源代码后,似乎 _parse_data 函数出现在 BigQueryHook : github.com/apache/airflow/blob/1.9.0/airflow/contrib/hooks/… 中。 GoogleCloudStorageToBigQueryOperator 使用此挂钩,但仅在 Airflow 版本 1.9.0 上使用。那是你运行的版本吗?顺便说一句,您能否在本地 Airflow 安装中重现该错误,还是仅在部署在 Composer 上时才会发生?
  • 您能否指出在您的情况下是哪个版本的 pandas-gbq 和 apache-airflow 导致了此问题?这可以帮助其他人。

标签: python google-cloud-platform google-bigquery google-cloud-storage airflow


【解决方案1】:

_parse_data 在 pandas-gbq 0.10.0 上已过时。

https://github.com/pydata/pandas-gbq/commit/ebcbfbe1fecc90ac9454751206115adcafe4ce24#diff-4db670026d33c02e5ad3dfbd5e4fd595L664

Airflow 在 1.10.0 之后停止使用 _parse_data

https://github.com/apache/airflow/commit/8ba86072f9c5ef81933cd6546e7e2f000f862053#diff-ee06f8fcbc476ea65446a30160c2a2b2L27

需要:

  • apache-airflow 降级到低于 1.10.0 的版本或

  • pandas-gbq 降级到低于 0.10.0 的版本。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-11-17
    • 2020-11-02
    • 2021-01-27
    • 1970-01-01
    • 2019-05-02
    • 2019-08-19
    • 1970-01-01
    • 2020-11-21
    相关资源
    最近更新 更多