【问题标题】:Apache Beam To BigQueryApache Beam 到 BigQuery
【发布时间】:2021-02-23 14:19:53
【问题描述】:

我正在 Google Cloud Dataflow 中构建一个流程,该流程将使用 Pub/Sub 中的消息,并根据一个键的值将它们写入 BQ 或 GCS。我可以拆分消息,但不确定如何将数据写入 BigQuery。我试过使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。

我的完整代码在这里:https://pastebin.com/4W9Vu4Km

基本上我的问题是我不知道如何在WriteBatchesToBQ(第73行)中指定变量element应该写入BQ。

我也尝试过在管道中直接使用beam.io.gcp.bigquery.WriteToBigQuery(第 128 行),但随后出现错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我给它的不是字典,而是字典列表(我想使用 1 分钟窗口)。

有什么想法吗? (如果代码中有一些太愚蠢的地方,请告诉我 - 我只是在短时间内使用 apache Beam,我可能会忽略一些明显的问题)。

【问题讨论】:

    标签: python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam


    【解决方案1】:

    WriteToBigQuery 示例格式如下:-

        project_id = "proj1"
        dataset_id = 'dataset1'
        table_id = 'table1'
        table_schema = ('id:STRING, reqid:STRING')
    
            | 'Write-CH' >> beam.io.WriteToBigQuery(
                                                        table=table_id,
                                                        dataset=dataset_id,
                                                        project=project_id,
                                                        schema=table_schema,
                                                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        ))
    

    你可以参考这个case,它会让你对beam data pipeline有一个简单的了解。

    【讨论】:

    • 嗨 Vibhor,很遗憾,这并没有帮助。我已经更新了第 127 行(例如 pastebin.com/8HUuCbLu),但我收到了问题中提到的 AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] (可能是因为 GroupWindowsIntoBatches 生成了字典列表而不是字典)。
    • 从哪里获得列表tagged_lines_result[Split.OUTPUT_TAG_BQ],通常在接近beam.io.WriteToBigQuery之前,数据应该已经在管道中解析。尝试参考我在帖子中分享的示例代码。在 ParDo 或管道中拆分记录,然后去写入数据。
    • 我在 pastebin.com/4W9Vu4Km 的第 118 行得到了它。我使用它的原因是因为我需要将管道分成两个流(一个到 BQ,另一个到 GCS)。我正在获取列表,因为我正在使用窗口功能。是否有可能以某种方式将字典列表提供给 WriteToBigQuery?
    • 是的,可以将列表加载到 BigQuery,但这取决于您希望如何加载。如果您想将完整数据加载为列表,则将列表映射到元素上并将数据加载到单个 STRING 字段。如果您想在每个 coll 中单独拆分列表的每个元素,请使用 ParDo 或在 Pipeline 中拆分它,并将每个元素映射到 BigQuery 的各个字段。
    • 我有一个字典列表,所有字典都有对应于目标表中列名的键。一个字典代表目标表中的一行。鉴于我目前所拥有的,你能给我一些关于最好使用哪些功能的建议吗?
    【解决方案2】:

    第二种方法是解决这个问题,你需要直接在管道中使用WriteToBigQuery函数。但是,需要包含一个 beam.FlatMap 步骤,以便 WriteToBigQuery 可以正确处理字典列表。

    因此完整的流水线拆分数据,按时间分组,写入BQ的定义如下:

     accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
                window_size) | "FlatMap" >> beam.FlatMap(
                lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
                                                                                                   schema=(
                                                                                                       output_table_bq_schema),
                                                                                                   write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                                                   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    

    完整的工作代码在这里:https://pastebin.com/WFwBvPcU

    【讨论】:

      猜你喜欢
      • 2018-12-07
      • 2021-02-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-21
      相关资源
      最近更新 更多