【问题标题】:Google data flow PCollection join谷歌数据流 PCollection 加入
【发布时间】:2020-05-14 05:29:56
【问题描述】:

我们正在从 KAFKA 流式传输数据并将其存储到 google Bigtable。

在写入大表之前,需要根据同一张表中的现有值计算一个值。

当vehcile_id 来自kafka 时,我必须检查bigtable 中vehcile_id 的数据是否已经存在。根据 bigtable 中车辆 ID 的日期时间,将计算行程 ID。

    PCollection<String> ids = pipeline.apply(KafkaIO.<String,String>read().....)
    PCollection<com.google.bigtable.v2.Row> BTread =pipeline.apply("read", BigtableIO.read().....)

对于实现上述要求的任何帮助将不胜感激。

谢谢。

【问题讨论】:

  • 您能否详细说明一下您是如何计算trip_id 的?为什么 Vehicle_id=1,trip_id=a1?谢谢!

标签: google-cloud-dataflow pipeline


【解决方案1】:

我认为您可以采取的方法很少。

如果您需要考虑 BigTable 中已有的数据,您可以创建第二个输入源并将其作为 side input 输入到变换中,或使用 CoGroupByKey 变换来组合两个输入。

如果您需要考虑管道已经写入的数据,您可能需要维护一些状态来跟踪已经写入的车辆 ID。您可以考虑将Beam state API 用于后者(请注意,状态对于窗口和键来说是唯一的)。

我不确定这些中的任何一个是否完全适合您的应用程序,但想提供一些您可以探索的建议。

【讨论】:

    猜你喜欢
    • 2016-10-15
    • 1970-01-01
    • 2021-01-10
    • 2022-01-16
    • 2018-07-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-14
    相关资源
    最近更新 更多