【问题标题】:Google Dataflow: insert + update in BigQuery in a streaming pipelineGoogle Dataflow:在流式管道中的 BigQuery 中插入 + 更新
【发布时间】:2019-05-01 07:53:12
【问题描述】:

主要对象

我从 pub/sub 读取输入的 python 流式管道。

分析输入后,有两种选择:

  • 如果 x=1 -> 插入
  • 如果 x=2 -> 更新

测试

  • 使用apache beam函数无法做到这一点,需要使用BigQuery的0.25 API开发(目前是Google Dataflow支持的版本)

问题

  • 插入的记录仍在 BigQuery 缓冲区中,因此更新语句失败:

         UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
    

代码

插入

def insertCanonicalBQ(input):
    from google.cloud import bigquery
    client = bigquery.Client(project='project')
    dataset = client.dataset('dataset')
    table = dataset.table('table' )
    table.reload()
    table.insert_data(
        rows=[[values]])

更新

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD= "#standardSQL"
    QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
    client.use_legacy_sql = False    
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
         query_job.reload()  # Refreshes the state via a GET request.
         if query_job.state == 'DONE':
             if query_job.error_result:
                 raise RuntimeError(query_job.errors)
             print "done"
             return input
             time.sleep(1)

【问题讨论】:

    标签: python google-bigquery google-cloud-dataflow apache-beam


    【解决方案1】:

    即使该行不在流式缓冲区中,这仍然不是 BigQuery 中解决此问题的方法。 BigQuery 存储更适合批量突变,而不是像这样通过UPDATE 突变单个实体。您的模式符合我对事务性用例而非分析性用例的期望。

    为此考虑一个基于附加的模式。每次处理实体消息时,都会通过流式插入将其写入 BigQuery。然后,在需要时,您可以通过查询获取所有实体的最新版本。

    作为示例,让我们假设一个任意模式:idfield 是您唯一的实体键/标识符,message_time 表示发出消息的时间。您的实体可能有许多其他字段。要获取最新版本的实体,我们可以运行以下查询(并可能将其写入另一个表):

    #standardSQL
    SELECT
      idfield,
      ARRAY_AGG(
        t ORDER BY message_time DESC LIMIT 1
      )[OFFSET(0)].* EXCEPT (idfield)
    FROM `myproject.mydata.mytable` AS t
    GROUP BY idfield
    

    这种方法的另一个优点是它还允许您在任意时间点执行分析。要对实体在一小时前的状态进行分析,只需添加一个 WHERE 子句:WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-11-11
      • 2020-06-09
      • 1970-01-01
      • 2019-11-11
      • 1970-01-01
      • 2022-09-27
      • 1970-01-01
      • 2020-03-10
      相关资源
      最近更新 更多