【问题标题】:Get all elements in a PCollection regardless of tagging获取 PCollection 中的所有元素,而不考虑标记
【发布时间】:2020-02-02 05:56:49
【问题描述】:

我有一个 BigQuery TableRow 元素的 PCollection,这些元素的标记取决于 TableRow 的一列是否已成功解析。

final TupleTag<TableRow> OK = new TupleTag<TableRow>(){};
final TupleTag<TableRow> NOTOK = new TupleTag<TableRow>(){};

我的 ParDo 函数根据列解析标记这些 TableRow,并返回一个名为 myPCollection 的 PCollectionTuple。

我想做以下事情:

  1. 获取 PCollection 中的所有元素(标记为 OK 和 NOTOK),并将它们输出到 BigQuery。
  2. 仅获取标记为 NOTOK 的元素并将它们发送到 Pub/Sub

我知道我可以通过调用来完成 #2

myPCollection.get(NOTOK)

我找不到第 1 项的方法。我看到有一个名为 myPCollection.getAll() 的方法,但它返回的不是 PCollection,而是 Map,PCollection>

关于如何获取整个元素集(无论它们如何标记)的任何想法?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    您可以使用Flatten 转换 (Beam guide) 将不同的 PCollection 合并为一个:

    PCollection<String> okResults = myPCollection.get(OK);
    PCollection<String> notOkResults = myPCollection.get(NOTOK);
    
    PCollectionList<String> pcl = PCollectionList.empty(p);
    pcl = pcl.and(okResults).and(notOkResults);
    PCollection<String> allResults = pcl.apply(Flatten.pCollections());
    

    在这种情况下,allResults 将包含 OKNOTOK 元素。我做了一个例子(完整代码here),有两条输入线,它们被分类为好的或坏的侧面输出:

    Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
    INFO: All elements: bad line
    Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
    INFO: All elements: good line
    Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$3 processElement
    INFO: Ok element: good line
    Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$4 processElement
    INFO: Not Ok element: bad line
    

    使用 2.17.0 SDK 和 DirectRunner 测试。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-02-01
      • 2022-01-18
      • 1970-01-01
      • 1970-01-01
      • 2015-01-15
      • 1970-01-01
      • 2019-01-16
      相关资源
      最近更新 更多