【问题标题】:OAuth authentication in apache airflow (Google Cloud Composer)Apache 气流中的 OAuth 身份验证(Google Cloud Composer)
【发布时间】:2022-05-12 02:01:51
【问题描述】:

我已经成功地用 Python 编写了一个 API 来读取 Gmail 消息、消息中的 URL、调用 URL 并存储 CSV 文件,但是,当我在 Apache Airflow [Google Cloud Composer] 中部署它时,我收到以下错误(如屏幕截图所示)。我相信它,因为我的代码找不到 token.json 和 credential.json。我尝试了很多方法,几乎​​做了 2 天的研究来解决这个问题,但找不到任何解决方案。

请注意:dag 文件和 API 文件来自云存储桶。甚至 token.json 和 credential.json 也在同一个桶中。

apache 气流错误

我正在为 Gmail API 使用 OAuth 2.0 密钥:

import os.path
import logging
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage
import pandas as pd
import requests
import json
import sys
import csv, os
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# import the required libraries 
from googleapiclient.discovery import build 
from google_auth_oauthlib.flow import InstalledAppFlow 
from google.auth.transport.requests import Request 
import pickle 
import csv
import requests
import pandas as pd
import datetime
from apiclient.discovery import build

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
    

full_date = datetime.datetime.now()
day = full_date.strftime("%d")
month = full_date.strftime("%b")
Year = full_date.strftime("%Y")

day_month_year = day+" "+month+" "+Year
subject = ""

    
def get_data_from():
    try:
        
        """Shows basic usage of the Gmail API.
        Lists the user's Gmail labels.
        """
        creds = None
        # The file token.json stores the user's access and refresh tokens, and is
        # created automatically when the authorization flow completes for the first
        # time.
        
        if os.path.exists('token.json'):
            creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        service = build('gmail', 'v1', credentials=creds)
        #show_chatty_threads(service)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    'credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open('token.json', 'w') as token:
                token.write(creds.to_json())

        service = build('gmail', 'v1', credentials=creds)
        
        # Call the Gmail API
        threads = service.users().threads().list(userId='me').execute().get('threads', [])
        for thread in threads:
            tdata = service.users().threads().get(userId='me', id=thread['id']).execute()
            nmsgs = len(tdata['messages'])
            msg = tdata['messages'][0]['payload']
            #print(msg['headers'])
            subject = ''
            for header in msg['headers']:
                if header['name'] == 'Subject':
                    subject = header['value']
                    
            for header in msg['headers']:
                if header['name'] == 'Date':
                    #print(len(header['value']))
                    date_email = header['value']
                    #if date_email == 'xyz':
                    if day_month_year in date_email:
                        print(date_email)
                        txt = service.users().messages().get(userId='me', id=thread['id']).execute()
                        #print(txt)
                        # Get value of 'payload' from dictionary 'txt' 
                        payload = txt['payload'] 
                        # The Body of the message is in Encrypted format. So, we have to decode it. 
                        # Get the data and decode it with base 64 decoder. 
                        parts = payload.get('parts')[0] 
                        data = parts['body']['data'] 
                        #print(data)
                        data = data.replace("-","+").replace("_","/")
                        #print(data)
                        decoded_data = base64.b64decode(data.encode('utf-8')).decode("utf-8")
                        #print(decoded_data)
                        #print(type(decoded_data))

                        #find URL in email body 
                        URL = re.search("(?P<url>https?://[^\s]+)", decoded_data).group("url")
                        URL = URL.replace(">","")
                        print(URL)
                        req = requests.get(URL)
                        url_content = req.content
                        print(url_content)
                        # Load into a dataframe
                        df = pd.read_excel(url_content)
                        print(df)
                        # Write to csv
                        file_name = subject+"-"+day_month_year+".csv" 
                        print(file_name)
                        bucket.blob('dags/orchestra/xyz/abc_temp/'+f'{file_name}').upload_from_string(df, 'text/csv')
                        df.to_csv(file_name)
                            
    except Exception as e: print(e)

【问题讨论】:

  • 您是否尝试过像document 中解释的那样放置一个非相对的完整路径?示例:'/home/airflow/gcs/dags/orchestra/credentials.json'。来自 post 的想法也可能有所帮助,尽管它与 Bigquery 有关。

标签: python python-3.x google-cloud-platform airflow


【解决方案1】:

尝试将其添加到 cache_discovery=False 的“服务”API 调用 file_cache is unavailable when using oauth2client &gt;= 4.0.0

示例:

service = build('gmail', 'v1', credentials=creds, cache_discovery=False)

【讨论】:

  • 你还应该添加一些答案的解释,看看我如何写一个好的答案How to Answer
猜你喜欢
  • 1970-01-01
  • 2018-06-26
  • 2017-12-10
  • 2014-12-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-23
  • 2014-06-07
相关资源
最近更新 更多