【问题标题】:Cloud Function running multiple times instead of once云函数运行多次而不是一次
【发布时间】:2021-02-03 21:25:00
【问题描述】:

我每天晚上 11 点通过 Cron 作业将 10 个文件上传到 GCS 上的存储桶。每个文件都是一个.csv,大小从230 KB。文件名始终为YYYY-MM-DD-ID.csv

每次我将文件上传到该存储桶以将这些 .csv 文件发送到 BigQuery 时,都会调用一个云函数。触发类型为Cloud Storage on finalise/create 事件。

我的问题如下: 在 BigQuery 上,每行/列的每个值都乘以倍数。有时是 1(所以值相同),通常是 2,有时是 3。我在下面附上了一个示例,说明 BigQuery (BQ) 和 Google Cloud Storage (GCS) 之间的区别。

好像是多次调用云函数。它不在代码上,而是在触发期间从 Cloud Function 传递的一些重复消息。当我今天要离开日志选项卡时,我可以看到 Cloud Function upload_to_bigquery 已被多次调用。

我试图修复它,但我犯了一个错误。我以为我们可以将临时文件写入 Cloud Functions,但我们不能。我的解决方案是将要上传到 BigQuery 的文件名写入 .txt 文件。在 BigQuery 上上传新文件之前,请阅读该 .txt 文件并检查当前文件是否存在于该列表中。如果文件名已经存在,请跳过。否则,将 .txt 文件名写入列表并做我的事情。

if file_to_upload not in text:
    text.append(file_to_upload)
    with open("all_uploaded_files.txt", "w") as text_file:
        for item in text:
            text_file.write(item + "\n")

    bucket = storage_client.bucket('sfr-test-data')
    blob = bucket.blob("all_uploaded_files.txt")
    blob.upload_from_filename("all_uploaded_files.txt")
    ## do my things here

else:
    print("file already uploaded")
    # skip to new file to upload

但即使我能做到,这个解决方案也不可行。数月后临时文件将变得如此之大,以至于变得一团糟。您知道解决此问题的最简单方法是什么吗?

云函数:upload_to_big_query - main.py

BUCKET = "xxx"
GOOGLE_PROJECT = "xxx"
HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Last Non-Direct Click Conversions": "last_non_direct_click_conversions",
    "Last Non-Direct Click Conversion Value": "last_non_direct_click_conversion_value",
    "Last Click Prio Conversions": "last_click_prio_conversions",
    "Last Click Prio Conversion Value": "last_click_prio_conversion_value",
    "Data-Driven Conversions": "dda_conversions",
    "Data-Driven Conversion Value": "dda_conversion_value",
    "% Change in Conversions from Last Non-Direct Click to Last Click Prio": "last_click_prio_vs_last_click",
    "% Change in Conversions from Last Non-Direct Click to Data-Driven": "dda_vs_last_click"
}

SPEND_HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Spend": "spend"
}

tables_schema = {
    "google-analytics": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("goal", bigquery.enums.SqlTypeNames.STRING, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversions", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE')
    ],
    "google-analytics-spend": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("spend", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
    ]
}


def download_from_gcs(file):
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.get_blob(file['name'])
    file_name = os.path.basename(os.path.normpath(file['name']))
    blob.download_to_filename(f"/tmp/{file_name}")
    return file_name


def load_in_bigquery(file_object, dataset: str, table: str):
    client = bigquery.Client()
    table_id = f"{GOOGLE_PROJECT}.{dataset}.{table}"
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        schema=tables_schema[table]
    )

    job = client.load_table_from_file(file_object, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.


def __order_columns(df: pd.DataFrame, spend=False) ->pd.DataFrame:
    # We want to have source and medium columns at the third position
    # for a spend data frame and at the fourth postion for others df
    # because spend data frame don't have goal column.
    pos = 2 if spend else 3

    cols = df.columns.tolist()
    cols[pos:2] = cols[-2:]
    cols = cols[:-2]
    return df[cols]


def __common_transformation(df: pd.DataFrame, date: str, goal: str) -> pd.DataFrame:
    # for any kind of dataframe, we add date and week columns
    # based on the file name and we split Source/Medium from the csv
    # into two different columns

    week_of_the_year = datetime.strptime(date, '%Y-%m-%d').isocalendar()[1]
    df.insert(0, 'date', date)
    df.insert(1, 'week', week_of_the_year)
    mapping = SPEND_HEADER_MAPPING if goal == "spend" else HEADER_MAPPING
    print(df.columns.tolist())
    df = df.rename(columns=mapping)
    print(df.columns.tolist())
    print(df)
    df["source_medium"] = df["source_medium"].str.replace(' ', '')
    df[["source", "medium"]] = df["source_medium"].str.split('/', expand=True)
    df = df.drop(["source_medium"], axis=1)
    df["week"] = df["week"].astype(int, copy=False)
    return df


def __transform_spend(df: pd.DataFrame) -> pd.DataFrame:
    df["spend"] = df["spend"].astype(float, copy=False)
    df = __order_columns(df, spend=True)
    return df[df.columns[:6]]


def __transform_attribution(df: pd.DataFrame, goal: str) -> pd.DataFrame:
    df.insert(2, 'goal', goal)
    df["last_non_direct_click_conversions"] = df["last_non_direct_click_conversions"].astype(int, copy=False)
    df["last_click_prio_conversions"] = df["last_click_prio_conversions"].astype(int, copy=False)
    df["dda_conversions"] = df["dda_conversions"].astype(float, copy=False)
    return __order_columns(df)


def transform(df: pd.DataFrame, file_name) -> pd.DataFrame:
    goal, date, *_ = file_name.split('_')
    df = __common_transformation(df, date, goal)
    # we only add goal in attribution df (google-analytics table).
    return __transform_spend(df) if "spend" in file_name else __transform_attribution(df, goal)


def main(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    file = event

    file_name = download_from_gcs(file)
    df = pd.read_csv(f"/tmp/{file_name}")

    transformed_df = transform(df, file_name)

    with open(f"/tmp/bq_{file_name}", "w") as file_object:
        file_object.write(transformed_df.to_csv(index=False))

    with open(f"/tmp/bq_{file_name}", "rb") as file_object:
        table = "google-analytics-spend" if "spend" in file_name else "google-analytics"
        load_in_bigquery(file_object, dataset='attribution', table=table)

【问题讨论】:

  • 应该在此处解释为什么会发生这种情况stackoverflow.com/questions/47360570/… 您实际上可以在文件更新到 BigQuerym 后将其移动到名为“gs://bucket/path/”的文件夹中到/csv/上传/"

标签: google-cloud-functions


【解决方案1】:

您可能更愿意查看此主题:

BigQuery displaying wrong results - Duplicating data from Cloud Function?

很快 - 该函数是幂等的,并且进程的状态(如果数据/文件是否上传到 BQ)应该保留在云函数之外。一个文本文件(在某些 GCS 存储桶中,而不是在云函数内存中,可以在云函数执行完成后立即删除)是一种选择,但 GCS 在这种特殊情况下有很多缺点。例如,firestore - 是更好的选择。

您可以考虑以下算法 -

当你的云函数启动时,它应该根据输入数据计算一些哈希码 - 文件/对象元数据或文件/对象数据或两者的组合。该哈希 - 对于给定的数据集应该是唯一的。

您的云函数连接到预定义的 firestore 集合(项目和名称可以在环境变量中提供)并检查是否存在以给定哈希作为 id 的文档/记录 - 是否已经存在。

如果 firestore 集合中已经存在该哈希(文档存在) - 云函数完成其执行并且不执行任何其他操作(可以进行日志记录,如果需要,将一些额外的详细信息添加到 firestore 文档等)。这样就简单地完成了它的执行。

如果未找到该散列(文档不存在) - 云函数会创建一个新文档,并将给定散列作为 id。如果需要,可以将一些元数据详细信息添加到该文档中。

创建文档后,云功能继续主“工作流程”。

需要牢记的几件事。

1/ IAM 权限 - 运行云功能的服务帐户 - 应该对 firestore 具有相关权限。显然,firestore API 将在给定项目中启用...

2/ 如果云函数创建了一个新的 firestore 文档,但随后未能将数据加载到 BigQuery(出于任何原因),将会发生什么情况。可能仅检查 Firestore 文档的存在是不够的。因此,要在 firestore 文档中维护适当的“状态”。例如,当创建一个新文档时(在 Firestore 中),应该有一个字段 __state 并为其分配一个值(例如)IN_PROGRESS。然后,当加载数据时,云函数会返回到 Firestore 并使用值 DONE 更新该字段(例如)。但即使这样也不够。由于您有加载作业 - 可能存在加载实际成功但云功能失败(包括超时在内的任何原因)的情况。您可能还想考虑在这种情况下该怎么做。在任何情况下,在 Firestore 中进行一些“状态”监控可能有助于了解/调查加载过程的情况。监控自动化可能需要开发单独的云功能,但这是另一回事。

3/ 正如我在上面指出的线程中提到的(请参阅该答案中的推理),从云函数内存中加载数据是一个冒险的想法。我建议您再考虑一下您的算法的那一部分。

4/ 如果成功,将加载的文件/对象从“输入”存储桶移动到某个“已处理”(或“存档”)存储桶,并将其移动到“失败”存储桶可能是个好主意桶,以防加载失败。那是在云函数代码中完成的。失败结果也可以反映在 firestore 文档中(即将__state 字段的值设置为FAILURE)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-06-09
    • 1970-01-01
    • 1970-01-01
    • 2012-12-08
    • 2019-06-29
    • 1970-01-01
    • 1970-01-01
    • 2016-07-13
    相关资源
    最近更新 更多