【发布时间】:2021-12-22 04:32:13
【问题描述】:
我的工作流程
-
使用云功能将数据从 Pub/Sub 流式传输到 BigQuery。
-
数据在流缓冲区中停留 90 分钟,因此我无法执行更新语句。
-
我需要在那个时间之前更新结果列。
请帮忙。
我在“Pub/Sub”中接收数据,然后触发“Cloud functions”将数据插入“BigQuery”
这是代码:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
exports.telemetryToBigQuery = (data, context) => {
if (!data.data) {
throw new Error('No telemetry data was provided!');
return;
}
//Data comes in as base64
console.log(`raw data: ${data.data}`);
//Data gets decoded from base64 to string
const dataDataDecode = Buffer.from(data.data, 'base64').toString();
var indexesSemicolons = [];
for (var i = 0; i < dataDataDecode.length; i++) {
if (dataDataDecode[i] === ";") {
indexesSemicolons.push(i);
}
}
if (indexesSemicolons.length == 14) {
const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);
async function insertRowsAsStream() {
// Inserts the JSON objects into my_dataset:my_table.
const datasetId = 'put your dataset here';
const tableId = 'put table id here';
const rows = [
{
Brand: brand,
Model: model,
Result: result
}
];
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();
} else {
console.log("Invalid message");
return;
}
}
此数据在 BigQuery 流缓冲区中保留大约 90 分钟,但我需要执行更新查询来更改 Result 列。这是不允许的,会导致错误
ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError
我需要一种在 90 分钟缓冲时间之前更新结果的方法。请大家帮帮我。
我在网上阅读了以下页面
我阅读了以下问题的答案,我想我理解他在说什么,但我不知道如何执行它。
如果我是正确的,他是说将我的数据流式传输到临时表中,然后从那里将其放入永久表中。
【问题讨论】:
标签: node.js google-bigquery google-cloud-functions google-cloud-pubsub dml