【问题标题】:Streaming csv data to BigQuery from Pub/Sub subscription using Dataflow使用 Dataflow 将 csv 数据从 Pub/Sub 订阅流式传输到 BigQuery
【发布时间】:2021-03-10 09:50:14
【问题描述】:

使用 GCP 探索 ETL 流程。我在 Dataflow 中使用 Pub/Sub Subscription to BigQuery 模板。

Pub/Subscription 中的消息数据为 csv 格式,如下所示

53466,06/30/2020,,特立尼达和多巴哥,2020-07-01 04:33:52,130.0,8.0,113.0

这会在加载到 BigQuery 表时出现错误。如何在模板中将 CSV 数据转换为 JSON?

【问题讨论】:

  • 你能分享一下错误吗?您能否详细说明如何启动您的模板?

标签: google-cloud-platform google-bigquery google-dataflow google-cloud-pubsublite


【解决方案1】:

我猜你使用了 this template ,它只能用于 Pub/Sub 订阅中的 JSON 格式字符串。文件也这么说。

据我所知,另一种方法是自行为 CSV 流数据自定义 this code

【讨论】:

  • 谢谢,这很有帮助。我得到了以下转换需要添加的功能但是在上面的模板中放置在哪里?
【解决方案2】:

解决了!!

使用 pub/sub 订阅 Bigquery 模板创建作业时,单击查看选项参数。我们可以在哪里设置.js文件路径和UDF函数名。

这里是转换的 JS 代码,即从 CSV 格式到 JSON 格式。

function transform(messages) {
  var values = messages.split(',');

  // Construct output and add transformations
  var obj = new Object();
  obj.SNo = values[0];
  var dateObj = values[1];
  // Date format in file is dd/mm/YYYY
  // Transform the field to Date format required for BigQuery that is YYYY-mm-dd
  obj.ObservationDate = dateObj.replace(/(\d\d)\/(\d\d)\/(\d{4})/, "$3-$1-$2");
  obj.Provision_State = values[2];
  obj.Country_Region = values[3];
  obj.Last_Update = values[4];
  obj.Confirmed = values[5];
  obj.Deaths = values[6];
  obj.Recovered = values[7];
  // add object to JSON
  var jsonString = JSON.stringify(obj);

  return jsonString;
}

【讨论】:

    猜你喜欢
    • 2020-06-22
    • 2018-02-15
    • 2017-05-10
    • 2018-06-21
    • 2021-04-09
    • 2019-10-03
    • 1970-01-01
    • 2021-11-23
    • 2022-12-02
    相关资源
    最近更新 更多