【问题标题】:Datastore queries in Dataflow DoFn slow down pipeline when run in the cloud在云中运行时,Dataflow DoFn 中的数据存储查询会减慢管道速度
【发布时间】:2017-02-24 06:24:35
【问题描述】:

我正在尝试通过在 DoFn 步骤中查询数据存储来增强管道中的数据。 CustomClass 类中的对象的字段用于对 Datastore 表进行查询,返回的值用于增强对象。

代码如下所示:

public class EnhanceWithDataStore extends DoFn<CustomClass, CustomClass> {

private static Datastore datastore = DatastoreOptions.defaultInstance().service();
private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind("article");

@Override
public void processElement(ProcessContext c) throws Exception {

    CustomClass event = c.element();

    Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId()));

    String articleName = "";
    try{
        articleName = article.getString("articleName");         
    } catch(Exception e) {}

    CustomClass enhanced = new CustomClass(event);
    enhanced.setArticleName(articleName);

    c.output(enhanced);
}

当它在本地运行时,这很快,但是当它在云中运行时,这一步会显着减慢管道。这是什么原因造成的?有没有解决方法或更好的方法来做到这一点?

可以在这里找到管道的图片(​​最后一步是增强步骤): pipeline architecture

【问题讨论】:

  • 如果你愿意分享一个job id,我们可以直接看一下。
  • 嗨,Kenn,工作 ID 是:2016-10-14_10_34_39-5525093815482139851。感谢您查看这个。代码看起来不错(这是从 Dataflow 查询 Datastore 的最佳做法)?
  • 再看一遍后,我提供了我认为最好的答案,然后再尝试在较低级别进行调试。

标签: java google-cloud-datastore google-cloud-dataflow


【解决方案1】:

您在此处所做的是将您的输入 PCollection&lt;CustomClass&gt; 与 Datastore 中的增强功能相结合。

对于 PCollection 的每个分区,对 Datastore 的调用将是单线程的,因此会产生很多延迟。我希望这在DirectPipelineRunnerInProcessPipelineRunner 中也会很慢。通过自动缩放和动态工作再平衡,您应该在 Dataflow 服务上运行时看到并行性,除非您的结构导致我们优化不佳,因此您可以尝试增加 --maxNumWorkers。但是您仍然不会从批量操作中受益。

在你的管道中表达这个连接可能会更好,使用DatastoreIO.readFrom(...) 后跟CoGroupByKey 转换。这样,Dataflow 将对所有增强功能进行批量并行读取,并使用高效的GroupByKey 机制将它们与事件对齐。

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key them both by the common id
PCollection<KV<Long, CustomClass>> keyedEvents =
    events.apply(WithKeys.of(event -> event.getArticleId()))

PCollection<KV<Long, Entity>> =
    articles.apply(WithKeys.of(article -> article.getKey().getId())

// Set up the join by giving tags to each collection
TupleTag<CustomClass> eventTag = new TupleTag<CustomClass>() {};
TupleTag<Entity> articleTag = new TupleTag<Entity>() {};
KeyedPCollectionTuple<Long> coGbkInput =
    KeyedPCollectionTuple
        .of(eventTag, keyedEvents)
        .and(articleTag, keyedArticles);

PCollection<CustomClass> enhancedEvents = coGbkInput
    .apply(CoGroupByKey.create())
    .apply(MapElements.via(CoGbkResult joinResult -> {
      for (CustomClass event : joinResult.getAll(eventTag)) {
        String articleName;
        try {
          articleName = joinResult.getOnly(articleTag).getString("articleName");
        } catch(Exception e) {
          articleName = "";
        }
        CustomClass enhanced = new CustomClass(event);
        enhanced.setArticleName(articleName);
        return enhanced;
      }
    });

另一种可能性,如果在内存中存储查找的文章非常少,则使用DatastoreIO.readFrom(...),然后通过View.asMap() 将它们作为地图侧输入读取并在本地表中查找它们。

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key the articles and create a map view
PCollectionView<Map<Long, Entity>> = articleView
    .apply(WithKeys.of(article -> article.getKey().getId())
    .apply(View.asMap());

// Do a lookup join by side input to a ParDo
PCollection<CustomClass> enhanced = events
    .apply(ParDo.withSideInputs(articles).of(new DoFn<CustomClass, CustomClass>() {
      @Override
      public void processElement(ProcessContext c) {
        Map<Long, Entity> articleLookup = c.sideInput(articleView);
        String articleName;
        try {
          articleName =
              articleLookup.get(event.getArticleId()).getString("articleName");
        } catch(Exception e) {
          articleName = "";
        }
        CustomClass enhanced = new CustomClass(event);
        enhanced.setArticleName(articleName);
        return enhanced;
      }
    });

根据您的数据,其中任何一个都可能是更好的选择。

【讨论】:

  • 嗨,肯恩,感谢您的回答。我很快就会试一试。我在这个解决方案中看到的唯一问题是,如果 Datastore 得到更新(我们可能会在运行管道时写入它),这些更改将不会在管道中可用?有没有办法解决这个问题(无需重新运行管道,假设我们以流模式运行它)?
  • 你是对的 - 如果它在管道运行时得到更新,你将获得一个特定的快照。您目前无法使用流式管道解决此问题,因为没有用于读取更改的 Datastore API。
  • 好的,感谢您的更新。是否有另一种解决方案可以存储更大的元数据数据集(不适合内存)并且在管道运行时动态更新? (不需要数据存储)
  • 我们设法查明了问题:该项目位于欧盟 - 因此数据存储区默认位于同一位置;而 Dataflow 作业默认托管在美国(我没有覆盖此选项)。很抱歉用这个(可能是微不足道的)问题打扰你 - 猜你必须以艰难的方式学习这个。仅供参考:它在同一位置的执行速度要快 25-30 倍; ~40 个元素/秒,而 15 名工人的 ~1200 个元素/秒。
  • 我建议将您的评论写为补充答案 - 我认为人们会从中获得价值。
【解决方案2】:

经过一番检查,我设法查明了问题:项目位于欧盟(因此,数据存储区位于欧盟区;同样作为 AppEningine 区域),而 Dataflow 作业 本身(以及工作人员)默认托管在美国(当不覆盖区域选项)。

性能差异为 25-30 倍:约 40 个元素/秒,而 15 名工人约 1200 个元素/秒。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多