【问题标题】:How to read CSVRecord in apache beam?如何在 Apache Beam 中读取 CSVRecord?
【发布时间】:2019-07-18 08:24:09
【问题描述】:

我有一个Java Iterable 对象,Iterable 记录。我想将它传递给 Beam 管道。我试过了

PCollection csvRecordPC = p.apply("创建集合", Create.of(records));

导致错误

执行 Java 类时发生异常。无法确定没有元素的“创建”PTransform 的默认编码器。添加元素,调用 Create.empty(Coder)、Create.empty(TypeDescriptor),或者在 PTransform 上调用 'withCoder(Coder)' 或 'withType(TypeDescriptor)'。

我应该使用哪个编码器?或者如何编写我的自定义编码器?

【问题讨论】:

    标签: csv apache-beam apache-commons-csv


    【解决方案1】:

    我找到了使用 FileIO 的解决方案。

    p.apply(FileIO.match().filepattern(options.getInputFile()))
     .apply(FileIO.readMatches())
     .apply(ParDo.of(new CsvParser())) 
    

    CsvPaser() 是

    public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
        @DoFn.ProcessElement
        public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
            InputStream is = Channels.newInputStream(element.open());
    
            Reader reader = new InputStreamReader(is);
    
            Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
    
            for (CSVRecord record : records) {
                receiver.output(record);
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-10-05
      相关资源
      最近更新 更多