【问题标题】:Call the Bigquery stored procedure in Dataflow pipeline在 Dataflow 管道中调用 Bigquery 存储过程
【发布时间】:2021-11-06 10:00:03
【问题描述】:

我在 Bigquery 中编写了一个存储过程,并尝试在数据流管道中调用它。这适用于SELECT 查询,但不适用于存储过程:

pipeLine = beam.Pipeline(options=options)
rawdata = ( pipeLine
            | beam.io.ReadFromBigQuery(
               query="CALL my_dataset.create_customer()", use_standard_sql=True)
          )
          pipeLine.run().wait_until_finish()

存储过程:

CREATE OR REPLACE PROCEDURE my_dataset.create_customer()
BEGIN
    SELECT * 
    FROM `project_name.my_dataset.my_table` 
    WHERE customer_name LIKE "%John%"
    ORDER BY created_time
    LIMIT 5;
END;

我能够创建存储过程并在 Bigquery 控制台中调用它。但是,在数据流管道中,它在调用它时会引发错误:

“代码”:400,
"message": "无法为脚本设置 configuration.query.destinationEncryptionConfiguration",

"message": "无法为脚本设置 configuration.query.destinationEncryptionConfiguration", “域”:“全球”,
“原因”:“无效”

“状态”:“INVALID_ARGUMENT”

编辑: Beam 中有没有其他方法可以用来调用 bigquery 中的存储过程

我看到在同一个问题上提出了多个线程,但没有找到答案,所以想发布这个问题。感谢您的帮助。

【问题讨论】:

  • 您能解释一下您在 Apache Beam 管道中调用存储过程的用例吗?

标签: python stored-procedures google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

过程的原则是执行一项工作,不返回任何内容。函数的原理是执行一项工作并返回一些东西。

您不能在 Dataflow 中使用存储过程作为读取,您的错误是正常的。参数化视图在管道中以实现您想要的。目前的解决方案是使用 UDF 或直接在代码中编写查询。


编辑 1

你想做什么?

  • 您要获取数据吗?如果是这样,这不是您必须使用的过程。
  • 您想简单地调用存储过程吗?如果是这样,只需执行 API 调用,使用 BigQuery 客户端库运行调用查询即可。但是您必须更新您的存储过程,因为目前它只是一个投影,并且“对于一个过程”没有用处。

【讨论】:

  • 如果不是 Read 方法,Beam 中还有其他方法可以用来调用存储过程吗?还是根本没有办法?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-02-03
  • 2021-06-09
  • 2018-10-18
  • 2021-06-16
  • 1970-01-01
  • 2019-05-01
  • 1970-01-01
相关资源
最近更新 更多