【问题标题】:Hadoop/MapReduce: Reading and writing classes generated from DDLHadoop/MapReduce:读取和写入从 DDL 生成的类
【发布时间】:2010-05-16 21:48:47
【问题描述】:

有人可以指导我了解使用从 DDL 生成的类读取和写入数据的基本工作流程吗?

我已经使用 DDL 定义了一些类似结构的记录。例如:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

我已经编译了这个来获得一个 Customer 类并将它包含到我的项目中。我可以很容易地看到如何将其用作映射器和化简器的输入和输出(生成的类实现 Writable),但不知道如何将其读取和写入文件。

org.apache.hadoop.record 包的 JavaDoc 讨论了以二进制、CSV 或 XML 格式序列化这些记录。我该怎么做?假设我的减速器产生 IntWritable 键和客户值。我使用什么 OutputFormat 以 CSV 格式写入结果?如果我想对它们进行分析,我以后会使用什么 InputFormat 来读取结果文件?

【问题讨论】:

    标签: hadoop mapreduce ddl


    【解决方案1】:

    好的,所以我想我已经弄清楚了。我不确定这是否是最直接的方式,所以如果您知道更简单的工作流程,请纠正我。

    从 DDL 生成的每个类都实现了 Record 接口,因此提供了两种方法:

    serialize(RecordOutput out) 用于写入 deserialize(RecordInput in) 用于读取

    RecordOutputRecordInputorg.apache.hadoop.record 包中提供的实用程序接口。有一些实现(例如 XMLRecordOutputBinaryRecordOutputCSVRecordOutput

    据我所知,您必须实现自己的 OutputFormatInputFormat 类才能使用它们。这很容易做到。

    例如,我在原始问题中谈到的 OutputFormat(以 CSV 格式写入整数键和客户值)将这样实现:

    
      private static class CustomerOutputFormat 
        extends TextOutputFormat<IntWritable, Customer> 
      {
    
        public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
          JobConf job,
          String name,
          Progressable progress)
        throws IOException {
          Path file = FileOutputFormat.getTaskOutputPath(job, name);
          FileSystem fs = file.getFileSystem(job);
          FSDataOutputStream fileOut = fs.create(file, progress);
          return new CustomerRecordWriter(fileOut);
        }   
    
        protected static class CustomerRecordWriter 
          implements RecordWriter<IntWritable, Customer> 
        {
    
          protected DataOutputStream outStream ;
    
          public AnchorRecordWriter(DataOutputStream out) {
            this.outStream = out ; 
          }
    
          public synchronized void write(IntWritable key, Customer value) throws IOException {
    
            CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
            csvOutput.writeInteger(key.get(), "id") ;
            value.serialize(csvOutput) ; 
          }
    
          public synchronized void close(Reporter reporter) throws IOException {
            outStream.close();
          }
        }
      }
    

    创建 InputFormat 大致相同。因为 csv 格式是每行一个条目,所以我们可以在内部使用 LineRecordReader 来完成大部分工作。

    
    
    private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {
    
      public RecordReader<IntWritable, Customer> getRecordReader(
        InputSplit genericSplit, 
        JobConf job,
        Reporter reporter)
      throws IOException {
    
        reporter.setStatus(genericSplit.toString());
        return new CustomerRecordReader(job, (FileSplit) genericSplit);
      }
    
      private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {
    
        private LineRecordReader lrr ;
    
        public CustomerRecordReader(Configuration job, FileSplit split) 
        throws IOException{
          this.lrr = new LineRecordReader(job, split);    
        }
    
        public IntWritable createKey() {
          return new IntWritable();
        }
    
        public Customer createValue() {
          return new Customer();
        }
    
        public synchronized boolean next(IntWritable key, Customer value)
        throws IOException {
    
          LongWritable offset = new LongWritable() ;
          Text line = new Text() ;
    
          if (!lrr.next(offset, line))
            return false ;
    
          CsvRecordInput cri = new CsvRecordInput(new      
            ByteArrayInputStream(line.toString().getBytes())) ;
          key.set(cri.readInt("id")) ;
          value.deserialize(cri) ;
    
          return true ;
        }
    
        public float getProgress() {
          return lrr.getProgress() ;
        }
    
        public synchronized long getPos() throws IOException {
          return lrr.getPos() ;
        }
    
        public synchronized void close() throws IOException {
          lrr.close();
        }
      }
    }
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-27
      • 2021-08-11
      • 1970-01-01
      相关资源
      最近更新 更多