【问题标题】:Google Cloud Composer using google-cloud-bigquery python client library使用 google-cloud-bigquery python 客户端库的 Google Cloud Composer
【发布时间】:2019-01-07 06:24:03
【问题描述】:

我正在尝试在 Google Cloud Composer 中运行 DAG,其中第一个组件是使用 http GET 请求调用 API,然后使用 python-client 库将 json 插入 BigQuery 表。我正在尝试运行此功能:https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html

import requests
import datetime
import ast
import numpy as np
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
import google.cloud.bigquery as bigquery

client = bigquery.Client(project = 'is-flagship-data-api-sand')
dataset_id = 'Mobile_Data_Test'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table('sample_wed')
table = client.get_table(table_ref)

def get_localytics_data():
    profiles_requests_command = "https://%s:%s@api.localytics.com/v1/exports/profiles/%d/profile"%(api_key,api_secret,28761)
    res_profiles = requests.get(profiles_requests_command)
    if res_profiles.status_code == 200:
        data = res_profiles.content
        data_split = data.split('\n')[:-1]
        data_split_ast = [ast.literal_eval(x) for x in data_split]

        #take out characters from the beginning to have neat columns
        data_split_ast_pretty = [dict(zip(map(lambda x: x[4:], item.keys()), item.values())) for item in data_split_ast]


        #add current date
        current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        for item in data_split_ast_pretty:
            item['DateCreated'] = current_time


        random_sample = list(np.random.choice(data_split_ast_pretty,5))  
        print random_sample
        client.insert_rows_json(table = table, json_rows = random_sample)
    else:
        pass




run_api = python_operator.PythonOperator(task_id='call_api',
        python_callable=get_localytics_data)

我添加了以下的 PYPI 包:

请求 ===2.19.1

numpy ===1.12.0

google-cloud-bigquery ===1.4.0

我收到以下错误:Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' object has no attribute 'get_table' 在 Airflow UI 控制台中。

显示的所有代码都可以在本地工作,但无法使用 Cloud Composer。

【问题讨论】:

  • 您使用的是哪个版本的 Cloud Composer?您可以通过描述环境来获得这一点。
  • 如果我去环境配置图像版本是:composer-0.5.1-airflow-1.9.0
  • 能否确认pypi安装成功?如果确实如此,它们应该显示在您的 pypi 包选项卡上。
  • Python 包索引 (PyPI) 名称请求所需的库 numpy google-cloud-bigquery 版本 ===2.19.1 ===1.12.0 ===1.4.0
  • 是的,pypi安装成功了。

标签: google-cloud-platform google-cloud-composer google-cloud-python


【解决方案1】:

听起来你有一个过时的google-cloud-bigquery 包,虽然它看起来不应该。

为了确定,需要通过 SSH 连接到 Composer 环境的 Google Kubernetes Engine (GKE) 集群并运行 pip freeze | grep bigquery 以了解安装的实际版本。

  1. 转到https://console.cloud.google.com/kubernetes/list
  2. 找到对应的GKE集群并点击。
  3. 单击顶部的连接。
  4. 在控制台中输入kubectl get pods。应该会出现一个 pod 列表。
  5. 输入kubectl exec -it <AIRFLOW_WORKER> /bin/bash 以airflow-worker-*开头的pod之一。
  6. 进入 pod 后,输入 pip freeze | grep bigquery,它应该会显示模块的版本。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2020-11-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多