【问题标题】:Load a huge data from BigQuery to python/pandas/dask从 BigQuery 加载大量数据到 python/pandas/dask
【发布时间】:2019-07-28 17:43:18
【问题描述】:

我阅读了其他类似的主题并搜索了 Google 以找到更好的方法,但找不到任何可行的解决方案。

我在 BigQuery 中有一个大表(假设每天插入 2000 万行)。我想在 python/pandas/dask 中有大约 2000 万行数据和大约 50 列来做一些分析。我曾尝试使用 bqclient、panda-gbq 和 bq 存储 API 方法,但在 python 中拥有 500 万行需要 30 分钟。还有其他方法吗?是否有任何 Google 服务可以做类似的工作?

【问题讨论】:

标签: pandas google-cloud-platform google-bigquery bigdata dask


【解决方案1】:

一些选项:

  • 在导出(较小的表)之前尝试在 BigQuery SQL 中进行聚合等 熊猫。
  • 在 Google Cloud 上运行 Jupyter 笔记本,在与 BigQuery 位于同一区域的高内存机器上使用深度学习虚拟机 数据集。这样可以最大限度地减少网络开销。

【讨论】:

  • bigquery 中的数据不够干净,无法进行任何类型的聚合!关于第二个选项,脚本将是一个日常的 etl 工作,我们也想降低成本。那种 vm Instant 对我们来说太贵了!!
  • “bigquery 中的数据不够干净,无法进行任何类型的聚合”这很难相信。很多人在执行聚合之前使用 BigQuery 来清理数据。不过,由于您没有向我们展示您的数据是什么样的,因此很难提供建议。
  • @elliott 我们不想使用 bigquery(因为成本)来清理数据,如果这样做更有意义的话。
  • 如果每行大小为 50 kB,则 2000 万行意味着数据大小为 1 TB,因此每天扫描和清理整个表将花费 5 美元。这些行是否很大?
【解决方案2】:

首先,您应该对您的代码进行概要分析,以找出花费时间的原因。它只是在等待大查询来处理您的查询吗?是不是数据的下载> 你的带宽是多少,你用的是几分之一?是否将这些数据解析到内存中?

由于您可以使 SQLAlchemy 支持大查询 (https://github.com/mxmzdlv/pybigquery),因此您可以尝试使用 dask.dataframe.read_sql_table 将查询拆分为多个分区并并行加载/处理它们。如果大查询限制了单个连接或单个机器的带宽,您可以通过在分布式集群上运行它来获得更好的吞吐量。

实验!

【讨论】:

  • 投反对票,因为该建议将运行n bq 表扫描
  • 如果您的分区与数据的分片模型相匹配,则不会
  • 是的,好点;不过,确实值得一提。
【解决方案3】:

您可能想先将数据导出到 Google Cloud Storage,然后将数据下载到本地计算机并加载。 以下是您需要采取的步骤:

  • 创建一个包含您想要的数据的中间表 出口。您可以选择并存储到中间表。
  • 将中间表以 JSON/Avro/Parquet 格式导出到 Google Cloud Storage。
  • 下载您导出的数据并加载到您的 python 应用程序。

除了将数据下载到本地计算机之外,您还可以使用 PySpark 和 SparkSQL 进行处理。将数据导出到 Google Cloud Storage 后,您可以启动 Cloud Dataproc 集群并将数据从 Google Cloud Storage 加载到 Spark,并在那里进行分析。

您可以在此处阅读示例

https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

您还可以在 Dataproc 集群中启动 Jupyter Notebook

https://cloud.google.com/dataproc/docs/tutorials/jupyter-notebook

希望这会有所帮助。

【讨论】:

    【解决方案4】:

    您可以随时将内容导出到云存储 -> 本地下载 -> 加载到您的 dask/pandas 数据帧中,而不是查询:

    1. 导出 + 下载:

      bq --location=US extract --destination_format=CSV --print_header=false 'dataset.tablename' gs://mystoragebucket/data-*.csv &&  gsutil -m cp gs://mystoragebucket/data-*.csv /my/local/dir/ 
      
    2. 加载到 Dask:

      >>> import dask.dataframe as dd
      >>> df = dd.read_csv("/my/local/dir/*.csv")
      

    希望对你有帮助。

    【讨论】:

    • 试了一下,总文件大小在45G左右
    • 让我们知道它是否有效。另外,如果确实有帮助,请将答案标记为正确:-)。
    • 是的,dask 从谷歌存储做 read_json 也好得多,所以不需要在本地磁盘上复制。也请用 json 更新 csv,csv 不工作
    • 你知道 dask 分布式是否将 read_csv() 的任务拆分到工作人员之间?
    • 可能是这样。你必须检查它的文档。
    【解决方案5】:

    晚了几年,但我们正在开发一个新的dask_bigquery 库,以帮助轻松地在 BQ 和 Dask 数据帧之间来回移动。看看吧,让我们知道你的想法!

    【讨论】:

      猜你喜欢
      • 2014-07-09
      • 2023-03-26
      • 1970-01-01
      • 2018-12-04
      • 2017-11-16
      • 1970-01-01
      • 2018-07-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多