【问题标题】:How to update data in the BigQuery stream buffer?如何更新 BigQuery 流缓冲区中的数据?
【发布时间】:2021-12-22 04:32:13
【问题描述】:

我的工作流程

  • 使用云功能将数据从 Pub/Sub 流式传输到 BigQuery。

  • 数据在流缓冲区中停留 90 分钟,因此我无法执行更新语句。

  • 我需要在那个时间之前更新结果列。

    请帮忙。

我在“Pub/Sub”中接收数据,然后触发“Cloud functions”将数据插入“BigQuery

这是代码:

const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();


exports.telemetryToBigQuery = (data, context) => {

  if (!data.data) {
    throw new Error('No telemetry data was provided!');
    return;
  }

  //Data comes in as base64
  console.log(`raw data: ${data.data}`);

  //Data gets decoded from base64 to string
  const dataDataDecode = Buffer.from(data.data, 'base64').toString();


var indexesSemicolons = [];

for (var i = 0; i < dataDataDecode.length; i++) {
    if (dataDataDecode[i] === ";") {
        indexesSemicolons.push(i);
    }
}

if (indexesSemicolons.length == 14) {

     const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
     const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
     const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);

    async function insertRowsAsStream() {
      // Inserts the JSON objects into my_dataset:my_table.

    
      const datasetId = 'put your dataset here';
      const tableId = 'put table id here';
      const rows = [
        {
          Brand: brand,
          Model: model,
          Result: result

        }
      ];

      // Insert data into a table
      await bigquery
        .dataset(datasetId)
        .table(tableId)
        .insert(rows);
      console.log(`Inserted ${rows.length} rows`);
    }
    insertRowsAsStream();
  } else {
    console.log("Invalid message");
    return;
  }
}

此数据在 BigQuery 流缓冲区中保留大约 90 分钟,但我需要执行更新查询来更改 Result 列。这是不允许的,会导致错误

ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError

我需要一种在 90 分钟缓冲时间之前更新结果的方法。请大家帮帮我。

我在网上阅读了以下页面

Life of a BigQuery stream

我阅读了以下问题的答案,我想我理解他在说什么,但我不知道如何执行它。

如果我是正确的,他是说将我的数据流式传输到临时表中,然后从那里将其放入永久表中。

Stackoverflow DML Update bigQuery

【问题讨论】:

    标签: node.js google-bigquery google-cloud-functions google-cloud-pubsub dml


    【解决方案1】:

    是的,没错。流式传输数据时,您不能使用 DML。解决方案是查询流缓冲区中的数据并将它们转换到另一个表中。正如您所说,这可能是暂时的,然后将它们沉入永久的桌子中。

    您还可以认为来自 PubSub 的流式数据是原始数据,并且您希望保留它们,然后您需要在另一个表中细化数据。这也是一种常见的数据工程模式,并具有不同层的过滤和转换,直至最终和有用的数据(也称为数据集市)

    【讨论】:

      【解决方案2】:

      回答你的问题。是的,它说您应该将数据流式传输到临时表并将其复制到另一个永久表,并且在原始表中您可以启用过期时间。这意味着该表将在过期时间过后被删除。

      您可以更改过滤器,使其不包含可能位于当前流缓冲区中的数据。如果您在更新数据时使用分区表,您可以添加一个WHERE 子句,其中时间戳的间隔为 40 到 90 分钟,例如:

      WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).
      

      【讨论】:

      • 我不认为这对我有用。因为在发布/订阅之后。一个文件在 30 到 200 分钟后上传到云存储(取决于发生了什么),然后触发一个带有更新语句的云函数和结果(失败或通过)。如果那时 Pub/Sub 数据仍在流缓冲区中,则结果永远不会更新。还是我不理解这个 Where 语句?
      • 我明白你告诉我的和你说的都是正确的,所以它不适用于你的环境,即使数据更新时的 where 语句,它只允许已经进入的数据桌子超过40分钟。因此,如果继续处理流,它将无法在您的环境中工作。
      【解决方案3】:

      我现在正在使用 BigQuery 创建作业方法。

      找到的例子here

      我直接将数据放入表中,因此我不必等待 90 分钟的流式缓冲区。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-10-25
        • 2017-08-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-02-28
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多