【发布时间】:2020-12-07 19:38:14
【问题描述】:
我有 2 个关于我的发展的问题。
问题 1
我正在尝试从 python 代码创建一个模板,该模板包括读取 BigQuery 表、应用一些转换并写入不同的 BigQuery 表(可以存在或不存在)。
关键是我需要将目标表作为参数发送,但看起来我不能在管道方法 WriteToBigQuery 中使用参数,因为它会引发以下错误消息:apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: project_target, type: str, default_value: 'Test').get() 不是从运行时上下文中调用的
方法 1
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery(
table=custom_options.project_target.get() + ":" + custom_options.dataset_target.get() + "." + custom_options.table_target.get(),
schema=custom_options.target_schema.get(),
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
方法 2
我创建了一个 ParDo 函数以获取变量并设置 WriteToBigQuery 方法。然而,尽管管道执行成功完成并且看到输出正在返回行(理论上是写的),但我看不到表或插入的数据。
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Pre-tasks" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery())
我尝试了 2 种方法,但均无济于事:BigQueryBatchFileLoads 和 WriteToBigQuery
class writeTable(beam.DoFn):
def process(self, element):
try:
#Load first here the parameters from the custom_options variable (Here we can do it)
result1 = Write(BigQueryBatchFileLoads(destination=target_table,
schema=target_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
result2 = WriteToBigQuery(table=target_table,
schema=target_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
method="FILE_LOADS"
)
问题 2
我还有一个疑问是,如果在最后一个 ParDo 类中,我需要像在最后一个管道步骤中那样返回元素或 result1 或 result2。
感谢您对此的帮助。
【问题讨论】:
-
* 有关成功执行的更多详细信息:请参阅下面的链接以查看方案 2 中的管道执行工作正常并且返回行,但是 BigQuery 中的表和数据均不可用。 i.stack.imgur.com/GCueP.png
-
* 有关方法 2 的更多详细信息:我在某处读到我需要执行以下步骤,但不确定如何执行:“一旦将其移出 DoFn,您需要应用PTransform beam.io.gcp.bigquery.WriteToBigQuery 到 PCollection 以使其产生任何效果”。对吗?
标签: python templates google-bigquery google-cloud-dataflow apache-beam