【发布时间】:2019-05-01 07:53:12
【问题描述】:
主要对象
我从 pub/sub 读取输入的 python 流式管道。
分析输入后,有两种选择:
- 如果 x=1 -> 插入
- 如果 x=2 -> 更新
测试
- 使用apache beam函数无法做到这一点,需要使用BigQuery的0.25 API开发(目前是Google Dataflow支持的版本)
问题
-
插入的记录仍在 BigQuery 缓冲区中,因此更新语句失败:
UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
代码
插入
def insertCanonicalBQ(input):
from google.cloud import bigquery
client = bigquery.Client(project='project')
dataset = client.dataset('dataset')
table = dataset.table('table' )
table.reload()
table.insert_data(
rows=[[values]])
更新
def UpdateBQ(input):
from google.cloud import bigquery
import uuid
import time
client = bigquery.Client()
STD= "#standardSQL"
QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2= 'YYY'"""
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
while True:
query_job.reload() # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
print "done"
return input
time.sleep(1)
【问题讨论】:
标签: python google-bigquery google-cloud-dataflow apache-beam