【问题标题】:use ValueProvider to format a BigQuery in Dataflow使用 ValueProvider 在 Dataflow 中格式化 BigQuery
【发布时间】:2020-02-17 20:49:17
【问题描述】:

我目前正在使用 Dataflow 在 python 中进行循环批处理。

基本上我从 bigquery 读取数据并对其进行处理。我的管道看起来像这样

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

我想使用数据流模板运行作业以使其适应运行时。

感谢文档 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates ,在您的函数部分使用 ValueProvider,我设法使用 ParDo 从运行时为“do_some_stuff”提供了一个额外的参数。


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

但我也想更改进程关注的用户数量,因此我想使查询适应运行时。


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

... 这不起作用,因为我在管道执行之前调用了 get()。到目前为止,我还没有设法将我为 do_some_stuff 函数所做的调整到“读取”行

任何关于如何进行的建议或解决方案将不胜感激。谢谢!

【问题讨论】:

    标签: python google-bigquery google-cloud-dataflow value-provider


    【解决方案1】:

    很遗憾,BigQuerySource 不支持值提供程序。这是因为它是在 Dataflow 运行器中本地实现的,因此所有信息都需要在管道构建时可用。

    您可以尝试转换 apache_beam.io.gcp.bigquery.ReadFromBigQuery - 这将允许您使用价值提供者。

    【讨论】:

    • 感谢 Pablo 的回答,我会找到另一种方法来做我需要的。
    • 您找到另一种写入 bq 表的方法了吗? @LouisDacquet
    • FWIW,写入 BQ 表不是问题。问题 ATM 正在模板化从 BQ 读取的管道,并在模板中更改要读取的表
    • 嗨@Pablo,我尝试使用apache_beam.io.gcp.bigquery.ReadFromBigQuery,但遇到了这个问题而不是stackoverflow.com/questions/68379038/…。您是否有成功将值提供者传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery 的示例?
    猜你喜欢
    • 2019-04-30
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多