【发布时间】:2020-05-14 05:29:56
【问题描述】:
我们正在从 KAFKA 流式传输数据并将其存储到 google Bigtable。
当vehcile_id 来自kafka 时,我必须检查bigtable 中vehcile_id 的数据是否已经存在。根据 bigtable 中车辆 ID 的日期时间,将计算行程 ID。
PCollection<String> ids = pipeline.apply(KafkaIO.<String,String>read().....)
PCollection<com.google.bigtable.v2.Row> BTread =pipeline.apply("read", BigtableIO.read().....)
对于实现上述要求的任何帮助将不胜感激。
谢谢。
【问题讨论】:
-
您能否详细说明一下您是如何计算trip_id 的?为什么 Vehicle_id=1,trip_id=a1?谢谢!
标签: google-cloud-dataflow pipeline