【问题标题】:Naming part files from reducer output从减速器输出中命名零件文件
【发布时间】:2017-03-08 18:28:55
【问题描述】:

我正在尝试根据输入文件中字段中的值将 HDFS 中的大型 avro 文件拆分为多个 avro 文件。以下部分展示了我的映射器、reducer 和驱动程序。

现在一切正常,但输出文件被命名为 01-r-00000.avro, 02-r-00000.avro...

而不是 stock-r-00000.avro,stock-r-00001.avro

我错过了什么?

谢谢

映射器:

  public static class CustomFileSplitMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<String>, AvroValue<GenericRecord>> {
    @Override
    public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
    throws IOException, InterruptedException {
      GenericRecord record = key.datum();
      LOGGER.info(record);
      AvroValue<GenericRecord> outValue = new AvroValue<GenericRecord>(record);
      context.write(new AvroKey<String>((String) record.get("date")), outValue);
    }
  }

减速机:

  public static class CustomFileSplitReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
    private AvroMultipleOutputs amos;
    private String outputPath;

    @Override
    protected void setup(Context context) {
      outputPath = context.getConfiguration().get("outputPath");
      amos = new AvroMultipleOutputs(context);
    }

    @Override
    public void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context)
    throws IOException, InterruptedException {
      for (AvroValue<GenericRecord> value : values) {
        String datePath = "daily" + File.separator + LocalDate.parse(new String(key.datum().getBytes()),
            DateTimeFormatter.ofPattern("yyyyMMdd")).format(DateTimeFormatter.ofPattern("yyyy/MM/dd"));
        GenericRecord record = value.datum();
        amos.write("stock", new AvroKey<GenericRecord>(record), NullWritable.get(),
        outputPath + File.separator + datePath);
     }
   }
   @Override
   public void cleanup(Context context) throws IOException, InterruptedException {
     amos.close();
   }
 }

司机:

Configuration conf = new Configuration();
conf.set("outputPath", props.getString("outputPath"));
Job job = Job.getInstance(conf, "CustomFileSplitter");
job.setJarByClass(CustomFileSplitter.class);
job.setMapperClass(CustomFileSplitMapper.class);
job.setReducerClass(CustomFileSplitReducer.class);
FileInputFormat.addInputPath(job, new Path(props.getString("inputPath")));
FileOutputFormat.setOutputPath(job, new Path(props.getString("outputPath")));
job.setInputFormatClass(AvroKeyInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
Schema schema = SchemaExtractor.extract(new Path(props.getString("inputPath")));

AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, schema);
AvroJob.setOutputKeySchema(job, schema);
AvroJob.setInputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "stock", AvroKeyOutputFormat.class, schema);

【问题讨论】:

    标签: hadoop mapreduce


    【解决方案1】:

    您必须为 multiOutput 中的零件文件附加所需的前缀

    multipleOutput.write(MULTI_OUTPUT_NAME, 输出, NullWritable.get(), EVEN_KEY_PATH + "库存");

    请关注博客了解详细内容

    http://bytepadding.com/big-data/map-reduce/multipleoutputs-in-map-reduce/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-09-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-11
      • 1970-01-01
      相关资源
      最近更新 更多