【问题标题】:Apache Beam python Bigquery change streaming insert into batch insert?Apache Beam python Bigquery将流式插入更改为批量插入?
【发布时间】:2019-06-06 18:40:06
【问题描述】:

我正在运行一个 Apache Beam 数据流作业,它从存储桶中读取数据,执行一些转换并写入 bigquery。 但是记录被插入到流缓冲区中。

validated_data = (p1
                  | 'Read files from Storage '+url >> beam.io.ReadFromText(url)
                  | 'Validate records ' + url >> beam.Map(data_ingestion.validate, url)\
                  .with_outputs(SUCCESS_TAG_KEY, FAILED_TAG_KEY, main="main")
)
all_data, _, _ = validated_data
success_records = validated_data[SUCCESS_TAG_KEY]
failed_records = validated_data[FAILED_TAG_KEY]


(success_records
 | 'Extracting row from tagged row {}'.format(url) >> beam.Map(lambda row: row['row'])
 | 'Write to BigQuery table for {}'.format(url) >> beam.io.WriteToBigQuery(
            table=data_ingestion.get_table(tmp=TEST, run_date=data_ingestion.run_date),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
)

其实我在上面运行之前需要先删除分区,这样可以避免摄取时间分区表出现重复记录。

如果我为同一个文件运行此作业超过 1 次,而不截断表格,表格最终会出现重复记录。

因为最后的记录在流缓冲区中,删除分区表命令实际上并没有删除分区。 下面是我用来截断表格的代码。并且这段代码在运行管道之前运行

client = bigquery.Client()
dataset = TABLE_MAP['dataset']
table = TABLE_MAP[sentiment_pipeline][table_type]['table']
table_id = "{}${}".format(table, format_date(run_date, '%Y%m%d'))
table_ref = client.dataset(dataset).table(table_id)
output = client.delete_table(table_ref)

【问题讨论】:

  • 您可以通过以下选项控制如何通过 BigQuery 插入数据:- FILE_LOADS:beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/… - STREAMING_INSERTS:beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/…
  • 我不确定是否同样适用于 apache beam python sdk!
  • 你好迪帕克。事实上,对于批处理管道,Dataflow 仅支持文件加载 - 而对于流式管道,它仅支持流式插入。如果您正在运行补丁管道。另一方面,如果您使用的是直接运行器,那么不幸的是,无论如何只支持流式插入。您使用的是什么运行器,为什么要重新运行批处理管道?
  • 嗨,Pablo,我正在使用数据流运行器。我相信用于 apach beam 的 python sdk 仅具有将插入到 bigquery 中的功能,apche Beam 内部使用 insertAll 方法,该方法仅在流模式下插入数据。 Apache beam python sdk 没有任何选项来指定加载作业
  • 我基本上是在谈论这个github.com/apache/beam/commit/…。但是到目前为止,这仅在 apache beam 的 git repo 中可用,可从 pip 获得的 apache beam 版本没有这些更改。在撰写此评论时,pip 仅支持 apach-beam==2.10,而相同的 beam-release 标签没有这些更改。

标签: python google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

根据 BigQuery 文档,您可能需要等待 30 minutes in order to make a DML statement on a a streaming table,以及类似 delete/truncate tables might result in data loss for some scenarios 的架构更改。 Here 是一些解决方法,您可以尝试在流式处理方案中处理重复项。

此外,Apache BeamDataflow 现在支持 python 的批量插入,因此这可能是避免流限制的好方法。

【讨论】:

    猜你喜欢
    • 2021-08-19
    • 1970-01-01
    • 1970-01-01
    • 2018-05-22
    • 1970-01-01
    • 1970-01-01
    • 2022-11-26
    • 1970-01-01
    • 2018-11-23
    相关资源
    最近更新 更多