【问题标题】:setting up airflow with bigquery operator使用 bigquery 运算符设置气流
【发布时间】:2023-03-21 18:50:01
【问题描述】:

我正在试验数据管道的气流。不幸的是,到目前为止,我无法让它与 bigquery 运算符一起使用。我已经尽我所能寻找解决方案,但我仍然卡住了。我正在使用本地运行的顺序执行器。

这是我的代码:

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['example@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    dag_id='bigQueryPipeline', 
    default_args=default_args, 
    schedule_interval=timedelta(1)
)

t1 = BigQueryOperator(
    task_id='bigquery_test',
    bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
    destination_dataset_table=False,
    bigquery_conn_id='bigquery_default',
    delegate_to=False,
    udf_config=False,
    dag=dag,
)

错误信息:

[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
  File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
    args.func(args)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
    conn = hook.get_conn()
  File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
    project = connection_extras['project']

【问题讨论】:

  • Jean-Christophe Rodrigue,您找到解决方案了吗?我也坚持同样的信息。我无法弄清楚 bigquery_conn_id 是什么,因为 bigquery_default 对我不起作用。

标签: google-bigquery airflow


【解决方案1】:

我花了一段时间才终于找到它,因为它没有很清楚地记录在案。在气流 UI 中,转到管理 -> 连接。该连接 ID 是参数 bigquery_connection_id 所引用的内容。您必须在“附加”字段中添加一个 json 对象,该对象定义一对“项目”:“”

如果您没有在运行 Airflow 的机器上明确授权帐户,您还必须为“service_account”和“key_path”添加密钥。 (gcloud 身份验证)

【讨论】:

  • BigQuery 运算符在当前版本中已损坏,我已为其配置了所有必要的“附加功能”,但无法连接。他们应该在下一个版本中修复这个问题,但我不知道什么时候会出来。
  • 我正在运行 v1.7.1.3,它对我来说很好用。当 Google 将 oauth2 客户端升级为不再包含 SignedJwtAssertionCredentials 时,我遇到了一些麻烦,我通过降级我的 oauth 版本来修复它。新版本切换到使用 ServiceAccountCredentials。
  • 我在使用此解决方案时收到了一些弃用警告...关于不使用 **kwargs
  • airflow 1.8 中的格式改变了,我相信你不需要再指定额外的了,因为有一个谷歌云平台连接类型可以从下拉列表中选择。
【解决方案2】:

如果您需要以编程方式执行此操作,我将其用作堆栈中的入口点来创建连接(如果它尚不存在):

from airflow.models import Connection
from airflow.settings import Session

session = Session()
gcp_conn = Connection(
    conn_id='bigquery',
    conn_type='google_cloud_platform',
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
        Connection.conn_id == gcp_conn.conn_id).first():
    session.add(gcp_conn)
    session.commit()

【讨论】:

  • 这甚至会去哪里?
【解决方案3】:

最近我通过同时指定bigquery_conn_idgoogle_cloud_storage_conn_id 解决了一个类似的问题,如下所示:

t1 = BigQueryOperator(
  task_id='bigquery_test',
  bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
  destination_dataset_table=False,
  bigquery_conn_id='bigquery_default',             <-- Need these both
  google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance 
  delegate_to=False,
  udf_config=False,
  dag=dag,
)

在此答案中查看更多信息:https://stackoverflow.com/a/45664830/634627

【讨论】:

    猜你喜欢
    • 2020-07-26
    • 2019-05-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-10
    • 2023-03-06
    相关资源
    最近更新 更多