【问题标题】:Writing to BigQuery from within a ParDo function从 ParDo 函数中写入 BigQuery
【发布时间】:2017-02-02 11:39:45
【问题描述】:

我想从 ParDo 函数中调用 beam.io.Write(beam.io.BigQuerySink(..)) 操作,以便为 PCollection 中的每个键生成单独的 BigQuery 表(我使用的是 python SDK)。这里有两个类似的线程,不幸的是没有帮助:

1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2) Dynamic table name when writing to BQ from dataflow pipelines

当我执行以下代码时,第一个键的行被插入到 BigQuery 中,然后管道失败并出现以下错误。非常感谢任何关于我做错了什么的建议或任何关于如何解决它的建议。

管道代码:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))

class par_upload(beam.DoFn):

    def process(self, context):
        key, value = context.element

        ### This block causes issues ###
        value | 'write_to_bq' >> beam.io.Write(
                        beam.io.BigQuerySink(
                            'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
                            schema=schema,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
            )
        ### End block ######
        return [value] 


### Following part works fine ###
filtered = (rows    | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
                    | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
                    | 'group_by_key' >> beam.GroupByKey() 
                    | 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
                    | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
                )

### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            'PROJECT-NAME:analytics.another_table',
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

错误信息:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
  File "split_events.py", line 137, in <module>
    run()
  File "split_events.py", line 132, in run
    p.run()
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
    runner.process(v)
  File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
  File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
  File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
  File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
    return self.run(self.dofn.process, context, args, kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
    result = method(context, *args, **kwargs)
  File "split_events.py", line 73, in process
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
    return _MaterializePValues(cache).visit(result)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
    return self._pvalue_cache.get_unwindowed_pvalue(node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
    return [v.value for v in self.get_pvalue(pvalue)]
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"

编辑(在第一个答案之后):

我没有意识到我的必须是PCollection

我现在已经把我的代码改成了这个(这可能效率很低):

key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))

现在 本地 可以正常工作,但不适用于 BlockingDataflowPipelineRunner :-(

管道失败并出现以下错误:

    JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
    op.start()
  File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
    def start(self):
  File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)

【问题讨论】:

  • 我认为第一个答案仍然是正确的——您不能从作为该管道的一部分运行的DoFn 向管道添加更多步骤。这在 DirectRunner 中有效的事实是一个错误。正如其他线程所建议的那样,如果您想要执行这种依赖于数据的写入,您需要直接与 BigQuery API 交互,而不是现在使用 BigQuerySink。

标签: google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

在类似的线程中,在 ParDo 中执行 BigQuery 写入操作的唯一建议是直接使用 BigQuery API,或使用client

您编写的代码是将Dataflow ParDo class beam.io.BigQuerySink() 放入DoFn 函数中。 ParDo 类期望在工作代码示例中像 filtered 一样在 PCollection 上工作。在value 上工作的非功能代码不是这种情况。

我认为最简单的选择是查看 gcloud-python BigQuery 函数 insert_data() 并将其放入 ParDo。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-12-11
    • 2019-10-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-27
    • 2020-06-29
    相关资源
    最近更新 更多