您在此处所做的是将您的输入 PCollection<CustomClass> 与 Datastore 中的增强功能相结合。
对于 PCollection 的每个分区,对 Datastore 的调用将是单线程的,因此会产生很多延迟。我希望这在DirectPipelineRunner 和InProcessPipelineRunner 中也会很慢。通过自动缩放和动态工作再平衡,您应该在 Dataflow 服务上运行时看到并行性,除非您的结构导致我们优化不佳,因此您可以尝试增加 --maxNumWorkers。但是您仍然不会从批量操作中受益。
在你的管道中表达这个连接可能会更好,使用DatastoreIO.readFrom(...) 后跟CoGroupByKey 转换。这样,Dataflow 将对所有增强功能进行批量并行读取,并使用高效的GroupByKey 机制将它们与事件对齐。
// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);
// Key them both by the common id
PCollection<KV<Long, CustomClass>> keyedEvents =
events.apply(WithKeys.of(event -> event.getArticleId()))
PCollection<KV<Long, Entity>> =
articles.apply(WithKeys.of(article -> article.getKey().getId())
// Set up the join by giving tags to each collection
TupleTag<CustomClass> eventTag = new TupleTag<CustomClass>() {};
TupleTag<Entity> articleTag = new TupleTag<Entity>() {};
KeyedPCollectionTuple<Long> coGbkInput =
KeyedPCollectionTuple
.of(eventTag, keyedEvents)
.and(articleTag, keyedArticles);
PCollection<CustomClass> enhancedEvents = coGbkInput
.apply(CoGroupByKey.create())
.apply(MapElements.via(CoGbkResult joinResult -> {
for (CustomClass event : joinResult.getAll(eventTag)) {
String articleName;
try {
articleName = joinResult.getOnly(articleTag).getString("articleName");
} catch(Exception e) {
articleName = "";
}
CustomClass enhanced = new CustomClass(event);
enhanced.setArticleName(articleName);
return enhanced;
}
});
另一种可能性,如果在内存中存储查找的文章非常少,则使用DatastoreIO.readFrom(...),然后通过View.asMap() 将它们作为地图侧输入读取并在本地表中查找它们。
// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);
// Key the articles and create a map view
PCollectionView<Map<Long, Entity>> = articleView
.apply(WithKeys.of(article -> article.getKey().getId())
.apply(View.asMap());
// Do a lookup join by side input to a ParDo
PCollection<CustomClass> enhanced = events
.apply(ParDo.withSideInputs(articles).of(new DoFn<CustomClass, CustomClass>() {
@Override
public void processElement(ProcessContext c) {
Map<Long, Entity> articleLookup = c.sideInput(articleView);
String articleName;
try {
articleName =
articleLookup.get(event.getArticleId()).getString("articleName");
} catch(Exception e) {
articleName = "";
}
CustomClass enhanced = new CustomClass(event);
enhanced.setArticleName(articleName);
return enhanced;
}
});
根据您的数据,其中任何一个都可能是更好的选择。