【问题标题】:Streaming Insert/Update in Google Cloud - BigQueryGoogle Cloud 中的流式插入/更新 - BigQuery
【发布时间】:2020-09-10 13:07:18
【问题描述】:

我正在尝试将 Salesforce 数据流式传输到 Google Cloud Bigquery。管理实现插入流,例如:每当在 SF 中创建新的潜在客户时,它都会被插入到 Biguery 表中。检查一下,有没有办法可以获取 Upsert 数据。我知道有一个流缓冲区不允许对插入的数据执行任何 DML 操作,因为这些数据将在流缓冲区上短时间。

非常感谢关于 Upsert 部分的任何提示

已编辑 - 2019 年 6 月 6 日

使用下方云端功能插入记录

    /**
     * Responds to any HTTP request.
     *
     * @param {!express:Request} req HTTP request context.
     * @param {!express:Response} res HTTP response context.
     */
    exports.helloWorld = (req, res) => {
    let message = req.query.mes || req.body.mes || 'Hello World!';
    res.status(200).send(req.body);
    var d =JSON.stringify(req.body);
      console.log(d);
    var e = d.replace(/:""/g, '');
  var f = e.replace(/\\/g, '');
  var g = f.replace(/"{n /g, '');
  var h = g.replace(/n}"/g, '');
  var i = h.replace(/n /g, '');
  console.log(i);
  const {BigQuery} = require('@google-cloud/bigquery');
  const bigquery = new BigQuery();
  var instance = "DEMO";
  var table = "HTTP";
  bigquery
    .dataset(instance)
    .table(table)
    .insert(JSON.parse(i),
    {'ignoreUnknownValues':true, 'raw':false})
    .then ((data) => {
      console.log('Inserted 1 rows');
      console.log(data);
    })
    };

【问题讨论】:

    标签: google-cloud-platform streaming


    【解决方案1】:

    适用于更新的代码

    exports.helloWorld = (req, res) => {
      let message = req.query.mes || req.body.mes || 'Hello World!';
      res.status(200).send(req.body);
      var d =JSON.stringify(req.body);
      console.log(d);
      var e = d.replace(/:""/g, '');
      var f = e.replace(/\\/g, '');
      var g = f.replace(/"{n /g, '');
      var h = g.replace(/n}"/g, '');
      var i = h.replace(/n /g, '');
      console.log(i);
      var j = JSON.parse(i);
      var k = JSON.stringify(j.Id);
      var id = k.replace(/"/g, '');
      console.log(k);
      console.log(id);
      const {BigQuery} = require('@google-cloud/bigquery');
      const bigquery = new BigQuery();
      var instance = "DEMO";
      var table = "LEADS_STG";
      bigquery
        .dataset(instance)
        .table(table)
        .insert(JSON.parse(i),
        {'ignoreUnknownValues':true, 'raw':false})
        .then ((data) => {
          console.log('Inserted 1 rows');
          console.log(data);
        })
      /*const bigqueryClient = new BigQuery();*/
      var delayInMilliseconds = 1000;
      setTimeout(function() {
      bigquery.query({
      query: [
        'MERGE DEMO.LEADS_D T USING  (SELECT ID,NAME,LEADSOURCE,COMPANY FROM DEMO.LEADS_STG where ID= ? order by LASTMODIFIEDDATE DESC LIMIT 1) S ON  T.ID = S.ID  WHEN MATCHED  THEN  UPDATE   SET  NAME = S.NAME,  LEADSOURCE = S.LEADSOURCE,  COMPANY = S.COMPANY  WHEN NOT MATCHED  THEN INSERT  ( ID,    NAME,    LEADSOURCE,    COMPANY) VALUES( ID, NAME,LEADSOURCE,COMPANY)'  
      ].join(' '),
      params: [
        id
      ]
    }, function(err, rows) {});
        }, delayInMilliseconds);
    };
    

    【讨论】:

      猜你喜欢
      • 2019-05-01
      • 2021-12-07
      • 1970-01-01
      • 2018-05-13
      • 2020-06-09
      • 2021-02-16
      • 2019-08-18
      • 1970-01-01
      • 2015-07-31
      相关资源
      最近更新 更多