【问题标题】:How do I read a .csv file into a GCP Dataflow and then get the count for a specific column and write it to BigQuery?如何将 .csv 文件读入 GCP 数据流,然后获取特定列的计数并将其写入 BigQuery?
【发布时间】:2020-05-07 05:00:56
【问题描述】:

我需要将一个 csv 文件读入代表一个表的 DataFlow,执行 GroupBy 转换以获取特定列中的元素数量,然后将该数字与原始文件一起写入 BigQuery 表。

到目前为止,我已经迈出了第一步 - 从我的存储桶中读取文件并调用了转换,但我不知道如何获取单个列的计数,因为 csv 有 16 个。

public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

    PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("gs://bucket/data.csv"));
    PCollection<String> grouped_lines = lines.apply(GroupByKey())

    PCollection<java.lang.Long> count = grouped_lines.apply(Count.globally())

    p.run();
  }
}

【问题讨论】:

  • 你能更好地指定你想要做的计数吗?
  • 另一种方法是将文件按原样写入 BigQuery,然后只需编写一条 SQL 语句即可获取计数并将其保存为表。比写GroupBy 等容易得多。您必须在所有事情上都使用 Dataflow 吗?
  • @GrahamPolley 不错的选择。此外,您是否知道您在组中使用哪个键?我建议您拆分 CSV 行并构建一个 KV 元素,使用您想要的键和 V 行。

标签: google-cloud-platform google-bigquery google-cloud-dataflow pipeline apache-beam


【解决方案1】:

您正在从 CSV 读取整行到字符串上的 PCollection。这对你来说很可能还不够。

你想做的是

  1. 将整个字符串拆分为与列相关的多个字符串
  2. 将 PCollection 过滤为在必填列中包含某些内容的值。 [1]
  3. 申请次数 [2]

[1]https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/Filter.html [2]https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/Count.html

【讨论】:

    【解决方案2】:

    如果你把那个 csv 转换成合适的形式会更好。例如:将其转换为 TableRow,然后基于 GroupByKey 执行。通过这种方式,您可以识别与特定值相对应的列,并据此找到计数。

    【讨论】:

      猜你喜欢
      • 2022-12-24
      • 1970-01-01
      • 2019-06-28
      • 2023-02-21
      • 2019-10-05
      • 2014-12-10
      • 1970-01-01
      • 1970-01-01
      • 2022-01-18
      相关资源
      最近更新 更多