【发布时间】:2021-11-06 10:00:03
【问题描述】:
我在 Bigquery 中编写了一个存储过程,并尝试在数据流管道中调用它。这适用于SELECT 查询,但不适用于存储过程:
pipeLine = beam.Pipeline(options=options)
rawdata = ( pipeLine
| beam.io.ReadFromBigQuery(
query="CALL my_dataset.create_customer()", use_standard_sql=True)
)
pipeLine.run().wait_until_finish()
存储过程:
CREATE OR REPLACE PROCEDURE my_dataset.create_customer()
BEGIN
SELECT *
FROM `project_name.my_dataset.my_table`
WHERE customer_name LIKE "%John%"
ORDER BY created_time
LIMIT 5;
END;
我能够创建存储过程并在 Bigquery 控制台中调用它。但是,在数据流管道中,它在调用它时会引发错误:
“代码”:400,
"message": "无法为脚本设置 configuration.query.destinationEncryptionConfiguration","message": "无法为脚本设置 configuration.query.destinationEncryptionConfiguration", “域”:“全球”,
“原因”:“无效”“状态”:“INVALID_ARGUMENT”
编辑: Beam 中有没有其他方法可以用来调用 bigquery 中的存储过程?
我看到在同一个问题上提出了多个线程,但没有找到答案,所以想发布这个问题。感谢您的帮助。
【问题讨论】:
-
您能解释一下您在 Apache Beam 管道中调用存储过程的用例吗?
标签: python stored-procedures google-bigquery google-cloud-dataflow apache-beam