【发布时间】:2018-11-05 15:34:24
【问题描述】:
我正在开发一个 Python 程序,以便像 Google Dataflow 模板一样使用。
我正在做的是从 PubSub 在 BigQuery 中写入数据:
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
(p
# This is the source of the pipeline.
| 'Read from PubSub' >> beam.io.ReadFromPubSub('projects/.../topics/...')
#<Transformation code if needed>
# Destination
| 'String To BigQuery Row' >> beam.Map(lambda s: dict(Trama=s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Trama:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
)
p.run().wait_until_finish()
代码在本地运行,尚未在 Google Dataflow 中运行
这“有效”但不是我想要的方式,因为目前数据存储在 BigQuery 缓冲区流中,我看不到它(即使等待了一段时间)。
什么时候可以在 BigQuery 中使用? 为什么存储在缓冲流而不是“普通”表中?
【问题讨论】:
-
数据在那里,但您需要运行查询来检索它,而不是查看表预览,这不会显示流缓冲区中的数据。
-
谢谢。在发布 Q 之前,我尝试查看运行查询的数据:SELECT * FROM
TableLIMIT 1000 即使使用 C# 代码而不是使用 Google云控制台,但仍然丢失。我想这可能是因为我在本地运行这个管道,就像一个流管道它没有完成,所以要完成它我必须按下停止按钮。 Here 是我可以看到停止和排水之间的不同之处,但不确定这是否是问题
标签: python google-bigquery google-cloud-dataflow