【问题标题】:Is there a way to execute non-blocking load_job from BigQuery Python Client library?有没有办法从 BigQuery Python 客户端库执行非阻塞 load_job?
【发布时间】:2019-10-19 10:26:28
【问题描述】:

我有一个使用 Flask_restful、Flask_CORS 和 Marshmallow 的 Flask API。 API 执行一些工作以将 *.csv 文件获取到 Cloud Storage(使用 signedURL),确认它已上传,然后创建并执行加载作业以将 csv 从 Storage 传输到 BigQuery。 API 中加剧我脱发的部分是调用在 GCP 中执行加载作业,将 csv 文件加载到 BigQuery。这是代码的sn-p:

...
            dataset_ref = bq_client.dataset(target_dataset) 
            job_config.schema =  bq_schema 
            job_config.source_format = SOURCE_FORMAT 
            job_config.field_delimiter =  DELIM  
            job_config.destination_table_description = TARGET_TABLE
            job_config.encoding = ENCODING 
            job_config.max_bad_records = MAX_BAD_RECORDS
            job_config.autodetect = False # Do not autodetect schema
            load_job = bq_client.load_table_from_uri(
                uri, dataset_ref.table(target_table), job_config=job_config
            )  # API request
            load_job.result() # **<-- This is the concern**
            return {"message": "Successfully uploaded to Bigquery"}, 200

传输文件可能需要一些时间,我担心的是,在有一些延迟的期间,网络服务器会在等待传输发生时超时。我更希望让load_job.result() 执行,获取作业 ID 并返回 201 响应。然后我可以使用作业 ID 来轮询 GCP 以确定它是否成功,而不会有客户端前端的请求超时的风险,让用户对它是否成功感到困惑不是。

我知道 load_job.result() 是异步的,但是对于 Flask 没有帮助。我打算改用 Quart 来使用 async/await,但我的其他依赖项不受支持,因此我需要进行大量重构。还有其他人用来解决此类问题的方法吗? 干杯

【问题讨论】:

  • 我将重新表述,以确定您想要做什么:您想使用 Quart 在异步模式下运行加载作业,然后在作业完成时使用回调?跨度>
  • 这就是我想做的,但 Quart 不支持 flask_cors 或 flask_restful 所以我正在寻找替代品。到目前为止,我倾向于只使用 RQ 并有一个任务队列来执行此操作。

标签: python flask google-cloud-platform google-bigquery


【解决方案1】:

夸脱解决不了任何问题。事实上,Quart 仍然需要一个运行环境,它等待并监督阻塞函数,并在最后调用你的回调。您的函数必须仍在运行才能执行此操作。

对此有更好的设计。我建议你看看Cloud Task。流程如下:

  • 运行加载作业
  • 使用参数中的加载作业 ID 创建任务
  • 退出函数
  • 任务将触发另一个检查作业是否结束的函数
    • 如果尚未完成,则返回错误代码(不同于 2XX)。
    • 如果完成,返回 OK 返回码 (2XX)

您必须使用 retry policy 设置 Cloud Task 以不立即重试(例如将 min-backoff 设置为 30 秒)

【讨论】:

  • 谢谢纪尧姆。我不知道云任务。我尝试了两种解决方案。一种是使用 PubSub 来触发 Cloud Function。最后,我使用 Redis 和 RQ 建立了一个任务队列。使用 Cloud Tasks,您是否与云任务相关联,或者是否可以提升并转移到自托管或其他云解决方案?主要原因是一些客户可能已经拥有本地解决方案或其他云提供商,并希望坚持他们所知道的。
  • Cloud Task 产品与 Google Cloud 相关联。它以前嵌入到 App Engine 中,已被提取以供其他产品使用,但它仍然是 Google Cloud 的内部产品。但是,它并不是一个太聪明的工具:获取 URL 并执行 HTTP 调用。如果失败,按照重试策略重试,否则关闭任务。我确信存在许多等效产品。
  • 好的,谢谢纪尧姆。我相信解决方案的要点是以某种形式利用任务队列。
猜你喜欢
  • 2014-11-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-18
  • 1970-01-01
  • 1970-01-01
  • 2022-01-20
  • 1970-01-01
相关资源
最近更新 更多