【问题标题】:Apache beam - Google Dataflow - WriteToBigQuery - Python - Parameters - Templates - PipelinesApache 梁 - 谷歌数据流 - WriteToBigQuery - Python - 参数 - 模板 - 管道
【发布时间】:2020-12-07 19:38:14
【问题描述】:

我有 2 个关于我的发展的问题。

问题 1

我正在尝试从 python 代码创建一个模板,该模板包括读取 BigQuery 表、应用一些转换并写入不同的 BigQuery 表(可以存在或不存在)。

关键是我需要将目标表作为参数发送,但看起来我不能在管道方法 WriteToBigQuery 中使用参数,因为它会引发以下错误消息:apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: project_target, type: str, default_value: 'Test').get() 不是从运行时上下文中调用的

方法 1

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> Write(WriteToBigQuery(
               table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),
        schema=custom_options.target_schema.get(),
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)

方法 2

我创建了一个 ParDo 函数以获取变量并设置 WriteToBigQuery 方法。然而,尽管管道执行成功完成并且看到输出正在返回行(理论上是写的),但我看不到表或插入的数据。

    with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Pre-tasks" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....

            | 'Write table 2' >> Write(WriteToBigQuery())

我尝试了 2 种方法,但均无济于事:BigQueryBatchFileLoads 和 WriteToBigQuery

class writeTable(beam.DoFn):
def process(self, element):
    try:
        #Load first here the parameters from the custom_options variable (Here we can do it)

        result1 = Write(BigQueryBatchFileLoads(destination=target_table,
                                     schema=target_schema,
                                     write_disposition=BigQueryDisposition.WRITE_APPEND,
                                     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
        
        result2 = WriteToBigQuery(table=target_table,
                        schema=target_schema,
                        write_disposition=BigQueryDisposition.WRITE_APPEND,
                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                        method="FILE_LOADS"
                        ) 

问题 2

我还有一个疑问是,如果在最后一个 ParDo 类中,我需要像在最后一个管道步骤中那样返回元素或 result1 或 result2。

感谢您对此的帮助。

【问题讨论】:

  • * 有关成功执行的更多详细信息:请参阅下面的链接以查看方案 2 中的管道执行工作正常并且返回行,但是 BigQuery 中的表和数据均不可用。 i.stack.imgur.com/GCueP.png
  • * 有关方法 2 的更多详细信息:我在某处读到我需要执行以下步骤,但不确定如何执行:“一旦将其移出 DoFn,您需要应用PTransform beam.io.gcp.bigquery.WriteToBigQuery 到 PCollection 以使其产生任何效果”。对吗?

标签: python templates google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

执行此操作的最可取的方法类似于 #1,但传递值提供程序而不调用 get,并为表传递一个 lambda:

with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> WriteToBigQuery(
               table=lambda x: custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),
        schema=custom_options.target_schema,
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)

这应该可行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-20
    • 1970-01-01
    • 1970-01-01
    • 2015-04-14
    • 1970-01-01
    • 2022-08-17
    • 1970-01-01
    相关资源
    最近更新 更多