【发布时间】:2021-02-23 14:19:53
【问题描述】:
我正在 Google Cloud Dataflow 中构建一个流程,该流程将使用 Pub/Sub 中的消息,并根据一个键的值将它们写入 BQ 或 GCS。我可以拆分消息,但不确定如何将数据写入 BigQuery。我试过使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。
我的完整代码在这里:https://pastebin.com/4W9Vu4Km
基本上我的问题是我不知道如何在WriteBatchesToBQ(第73行)中指定变量element应该写入BQ。
我也尝试过在管道中直接使用beam.io.gcp.bigquery.WriteToBigQuery(第 128 行),但随后出现错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我给它的不是字典,而是字典列表(我想使用 1 分钟窗口)。
有什么想法吗? (如果代码中有一些太愚蠢的地方,请告诉我 - 我只是在短时间内使用 apache Beam,我可能会忽略一些明显的问题)。
【问题讨论】:
标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam