【问题标题】:How to read multiple datastore kinds in cloud dataflow python pipeline如何在云数据流 python 管道中读取多种数据存储类型
【发布时间】:2019-01-04 18:54:13
【问题描述】:

我正在尝试从我的 python 管道中的默认命名空间读取多个数据存储类型,并希望对其进行处理。我编写的函数在本地使用 DirectRunner 运行良好,但是当我使用 DataflowRunner 在云上运行管道时,其中一种(包含 1500 条记录)读取速度非常快,而另一种(包含数百万条记录)读取速度非常慢.

作为参考,当我尝试在管道中读取一种(包含数百万条记录)时,它需要 10 分钟,但是当它们一起执行时,它花了将近 1 小时,但它仍然只处理了 1/10 的记录。

我无法弄清楚问题出在哪里。

这是我的代码

def read_from_datastore(project,user_options, pipeline_options):
  p = beam.Pipeline(options=pipeline_options)
  query = query_pb2.Query()
  query.kind.add().name = user_options.kind   #reading 1st kind this is the one with million records

  students = p | 'ReadFromDatastore' >> ReadFromDatastore(project=project,query=query)

  query = query_pb2.Query()
  query.kind.add().name = user_options.kind2   #reading 2nd kind this is the one with 1500 records

  courses = p | 'ReadFromDatastore2' >> ReadFromDatastore(project=project,query=query)

  open_courses = courses | 'closed' >> beam.FlatMap(filter_closed_courses)
  enrolled_students = students | beam.ParDo(ProfileDataDumpDataFlow(),AsIter(open_courses))

如果有人知道为什么会发生这种情况,请告诉我。

【问题讨论】:

  • 你能分享你指定的pipeline options吗?尤其是 num_workers、max_num_workers 和 machine_type。
  • 您可以查找this 示例如何使用 Dataflow 进行关系连接。
  • 嘿@Yurci,管道选项是默认的 Google Dataflow 选项,无需修改。

标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow apache-beam apache-beam-io


【解决方案1】:

我看到您正在执行两种连接操作。为此,如果您export entities to a bucket 然后将其加载到BigQuery,它将更合适、更快。在 BigQuery 中进行所需的联接操作。

它不是在你的工作中读取实体,它是连接操作。

【讨论】:

  • 我不认为加入是这里的问题。当我在评论 ParDo 后尝试一起阅读这两个实体并直接打印它们时,情况仍然相同。大约 1500 门课程几乎可以立即阅读,而拥有数百万条记录的学生则需要大量时间。或者,当我对一些 200-300 门课程进行硬编码并运行它时,Join 可以完美运行,我会在大约 15-20 分钟内获得所有学生数据的输出。
猜你喜欢
  • 2013-09-08
  • 2020-12-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-20
  • 2017-08-17
  • 2017-11-09
  • 2019-08-02
相关资源
最近更新 更多