【问题标题】:Apache Beam DoFn with query to Google Datastore is slow in cloud -pythonApache Beam DoFn 对 Google 数据存储的查询在云中很慢-python
【发布时间】:2018-08-24 04:54:50
【问题描述】:

我想使用 Apache Beam 将实体更新到数据存储区,但在执行 WriteToDatastore 之前,我创建了一个自定义 DoFN - 从导入步骤中获取实体, - 检查数据存储区中是否存在实体 - 如果是,则提取属性的值并将该值与新实体连接。 - 输出实体

示例:我的数据由以下列组成:parent_id、nationality & child_name。输入数据是新出生的孩子。在更新插入数据存储之前,我想获取父母现有的孩子并附加新值。

with beam.Pipeline(options=options) as p:

    (p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
     | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict())
     | 'Create entities' >> beam.ParDo(CreateEntities())
     | 'Update entities' >> beam.ParDo(UpdateEntities())
     | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
     )

花费时间最多的 Pardo 是更新实体:

class UpdateEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
    query = query_pb2.Query()
    parent_key = entity_pb2.Key()
    parent = datastore_helper.get_value(element.properties['parent_id'])
    datastore_helper.add_key_path(parent_key, kind, parent)
    parent_key.partition_id.namespace_id = datastore_helper.get_value(element.properties['nationality'])
    query.kind.add().name = kind
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.EQUAL, parent_key)


    req = helper.make_request(project=PROJECT, namespace=parent_key.partition_id.namespace_id,query=query)
    resp = helper.get_datastore(PROJECT).run_query(req)

    if len(resp.batch.entity_results) > 0:
        existing_entity = resp.batch.entity_results[0].entity
        existing_child_name_v = datastore_helper.get_value(existing_entity.properties['child_name'])
        new_child_names = existing_child_name_v + ';' + datastore_helper.get_value(element.properties['child_name'])
        datastore_helper.set_value(element.properties['child_name'],new_child_names)
        return [element]
    else:
        return [element]

【问题讨论】:

    标签: python google-cloud-datastore google-cloud-dataflow apache-beam


    【解决方案1】:

    UpdateEntities 是光束流中最慢的部分,这并不奇怪。您在每次调用 UpdateEntities 时都会执行一次 RPC(您应该使用 get/lookup 而不是对键的查询,因为对键的查询最终是一致的)。只要您在 UpdateEntities 中执行 RPC,它将是您工作中最慢的部分。

    【讨论】:

    • 谢谢。您是说我最好将上述查询替换为 ReadFromDatastore 转换作为管道的侧输入,然后进行查找?还是别的什么?
    • 我实际上并没有建议的修复方法;)也就是说,是的,您需要找到一种方法将所有负载与其他工作并行执行,然后将查找结果与你的其他步骤。我说的重要一点是,如果 ParDo fn 中有 RPC,那么与没有任何 RPC 的 ParDo 函数相比,它会很慢。
    猜你喜欢
    • 2017-02-24
    • 2021-10-22
    • 1970-01-01
    • 2011-04-22
    • 1970-01-01
    • 1970-01-01
    • 2013-06-29
    • 1970-01-01
    • 2020-08-16
    相关资源
    最近更新 更多