【问题标题】:How to process files serially in cloud function?如何在云功能中串行处理文件?
【发布时间】:2020-07-15 10:46:08
【问题描述】:

我写了一个基于云存储触发器的云函数。我有 10-15 个文件以 5 秒的间隔登陆云存储桶,它将数据加载到 bigquery 表中(截断并加载)。

虽然存储桶中有 10 个文件,但我希望云功能以顺序方式处理它们,即一次 1 个文件,因为所有文件都访问同一个表进行操作。

目前云功能一次被多个文件触发,并且由于多个文件试图访问同一个表,它在 BIgquery 操作中失败。

有没有办法在云功能中配置这个??

提前致谢!

【问题讨论】:

  • 所有文件都写在同一个表中吗?如果不是,您能否根据文件 prefix 或 GCS 中的不同路径来区分目标表?你每天有多少文件?
  • 是的,我们有一个表来加载所有文件,这是一个截断加载表。不,我不能创建多个表,因为它们将再次指向同一个最终表。我们每天最多接收 30 个文件,但可能会有所不同
  • 文件有特定的顺序吗?或者,您是否在截断加载后执行查询?
  • NO 没有接收或加载文件的特定顺序。是的,我们在分段加载后执行查询和转换活动。
  • 为什么需要按顺序处理它们?这使云架构复杂化,并限制了它的可扩展性。为了更好地理解这个问题,请阅读:cloud.google.com/pubsub/docs/ordering

标签: google-cloud-platform google-bigquery google-cloud-functions google-cloud-storage


【解决方案1】:

您可以通过使用 pubsub 和 Cloud Function 上的最大实例参数来实现。

编辑

感谢您的代码,我明白会发生什么。事实上,BigQuery 是一个声明式系统。当您执行请求或加载作业时,会创建一个作业并在后台运行。

在 python 中,你可以明确地等待工作结束,但是,对于 pandas,我没有找到方法!!

我刚刚找到了Google Cloud page to explain how to migrate from pandas to BigQuery client library。可以看到,最后有一行

# Wait for the load job to complete.
job.result()

然后等待工作结束。

您在 _insert_into_bigquery_dwh 函数中做得很好,但在暂存 _insert_into_bigquery_staging 函数中却不是这样。这可能会导致 2 个问题:

  • dwh 函数处理旧数据,因为触发此作业时暂存尚未完成
  • 如果暂存需要 10 秒并在“后台”运行(您不会在代码中明确等待结束)并且 dwh 需要 1 秒,则在 dwh 结束时处理下一个文件功能,即使登台继续在后台运行。这会导致您的问题。

【讨论】:

  • 感谢 guillaume 的回复,但您能否建议我按照cloud.google.com/solutions/… 实现架构时如何实现不并发
  • hmmm,查看文档后,你可以实现同样的事情。忘记 pubsub 并只为函数设置最大实例。它应该按原样工作。
  • 等等,同一个云函数必须执行加载和查询。如果不是,则不存在一致性,因为 PubSub 将流程解耦。我的意思是,如果您在 streaming_success 函数中执行查询,它将不起作用。在流媒体功能中完成所有工作。
  • 是的,我在流媒体功能中完成了这一切。当文件落入存储桶并触发云函数时,它将文件加载到临时表中,然后执行一些查询操作和数据清理,并加载到最终的 BQ 表中。但是,如果同时又有一个文件进入存储桶,该文件也尝试访问同一个 BQ 暂存表,并且该过程失败。仅供参考,我已经在云函数中将我的最大实例保持为 1
  • 您已经将最大实例设置为 1 并且存在并发问题?
【解决方案2】:

您描述的架构与您 linked 的文档中的架构不同。请注意,在流程图和代码示例中,存储事件会触发云功能,该功能会将数据直接流式传输到目标表。由于 BigQuery 允许多个流式插入作业,因此可以同时执行多个函数而不会出现问题。在您的用例中,用于加载 write-truncate 以进行数据清理的中间表会产生很大的不同,因为每次执行都需要前一个执行完成,因此需要顺序处理方法。

我想指出,PubSub 不允许配置发送消息的速率,如果有 10 条消息到达主题,即使一次处理一条,它们也会全部发送给订阅者。由于上述原因,将函数限制为一个实例可能会导致开销,并且还会增加延迟。也就是说,由于预期的工作量是每天 15-30 个文件,因此上述问题可能不是什么大问题。

如果您希望并行执行,您可以尝试为每条消息创建一个新表,并使用table.expires(exp_datetime) setter 方法为其设置一个较短的到期期限,这样多个执行就不会相互冲突。这里是相关库reference。否则,纪尧姆的出色回答将完全完成工作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-04-13
    • 2018-07-03
    • 1970-01-01
    • 2020-10-12
    • 1970-01-01
    • 2019-10-27
    • 1970-01-01
    • 2019-01-24
    相关资源
    最近更新 更多