【问题标题】:How to count total number of rows in a file using google dataflow如何使用谷歌数据流计算文件中的总行数
【发布时间】:2016-08-30 21:52:40
【问题描述】:

我想知道是否有办法使用谷歌数据流找出文件中的总行数。任何代码示例和指针都会有很大帮助。基本上,我有一个方法

int getCount(String fileName) {}

因此,上述方法将返回总行数,其实现将是数据流代码。

谢谢

【问题讨论】:

  • 您能否澄清一下文件有多大,以及为什么要为此使用 Dataflow,而不是直接读取文件并逐行计算行数的 Java 程序?除非文件的大小至少有数 GB,并且除非文件已经存储在 Google Cloud Storage 上,否则 Dataflow 很可能不是完成这项工作的最佳工具。
  • 感谢您的关注。是的文件基本上是一个 gz 文件,大小为 GBs 。文件也位于 GCS 存储桶中。除了数据流,你有没有想到其他的方式或者有示例代码,链接给我看看。我可以使用 PCollection(String) 中的数据流从 GCS 存储桶中读取文件并对其应用 Count.Globally 但这又给了我 PCollection(Long),因此无法从我的方法返回单个 long 值。谢谢。
  • @chchrist:正如我所提到的,我已经使用了 Count.Globally 但问题是它还返回 PCollection 我需要我的方法来返回 Long 值。我不确定如何从 PCollection 读取值

标签: google-cloud-dataflow


【解决方案1】:

似乎您的用例不需要分布式处理,因为文件已压缩,因此无法并行读取。但是,您可能仍然会发现使用 Dataflow API 很有用,因为它们易于访问 GCS 和自动解压。

由于您还希望将结果作为实际 Java 对象从管道中获取,因此您需要使用 Direct 运行程序,该运行程序在进程内运行,无需与 Dataflow 服务通信或进行任何分布式处理,但作为回报它提供了将PCollection 提取到Java 对象中的能力:

类似这样的:

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

【讨论】:

  • 如果我们使用 DataflowRunner,如何计算输入文件中的行数,因为上述解决方案不适用于我的情况。
  • 您仍然可以使用 Count.globally() 但您必须让您的管道将生成的 1 元素 PCollection 写入一个文件,在管道完成后您可以从您的程序中读取该文件。跨度>
猜你喜欢
  • 1970-01-01
  • 2013-01-14
  • 2022-01-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-11-15
  • 1970-01-01
相关资源
最近更新 更多