【问题标题】:How to write avro output in hadoop map reduce?如何在hadoop map reduce中编写avro输出?
【发布时间】:2014-03-19 12:32:04
【问题描述】:

我写了一个 Hadoop 字数统计程序,它接受 TextInputFormat 输入,并应该以 avro 格式输出字数。

Map-Reduce 作业运行良好,但可以使用诸如 morevi 之类的 unix 命令读取此作业的输出。我期待这个输出是不可读的,因为 avro 输出是二进制格式。

我只使用了 mapper,reducer 不存在。我只是想尝试 avro,所以我不担心内存或堆栈溢出。按照mapper的代码

public class WordCountMapper extends Mapper<LongWritable, Text, AvroKey<String>, AvroValue<Integer>> {

    private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] keys = value.toString().split("[\\s-*,\":]");
        for (String currentKey : keys) {
            int currentCount = 1;
            String currentToken = currentKey.trim().toLowerCase();
            if(wordCountMap.containsKey(currentToken)) {
                currentCount = wordCountMap.get(currentToken);
                currentCount++;
            }
            wordCountMap.put(currentToken, currentCount);
        }
        System.out.println("DEBUG : total number of unique words = " + wordCountMap.size());
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Map.Entry<String, Integer> currentKeyValue : wordCountMap.entrySet()) {
            AvroKey<String> currentKey = new AvroKey<String>(currentKeyValue.getKey());
            AvroValue<Integer> currentValue = new AvroValue<Integer>(currentKeyValue.getValue());
            context.write(currentKey, currentValue);
        }
    }
}

驱动代码如下:

public int run(String[] args) throws Exception {

    Job avroJob = new Job(getConf());
    avroJob.setJarByClass(AvroWordCount.class);
    avroJob.setJobName("Avro word count");

    avroJob.setInputFormatClass(TextInputFormat.class);
    avroJob.setMapperClass(WordCountMapper.class);

    AvroJob.setInputKeySchema(avroJob, Schema.create(Type.INT));
    AvroJob.setInputValueSchema(avroJob, Schema.create(Type.STRING));

    AvroJob.setMapOutputKeySchema(avroJob, Schema.create(Type.STRING));
    AvroJob.setMapOutputValueSchema(avroJob, Schema.create(Type.INT));

    AvroJob.setOutputKeySchema(avroJob, Schema.create(Type.STRING));
    AvroJob.setOutputValueSchema(avroJob, Schema.create(Type.INT));


    FileInputFormat.addInputPath(avroJob, new Path(args[0]));
    FileOutputFormat.setOutputPath(avroJob, new Path(args[1]));

    return avroJob.waitForCompletion(true) ? 0 : 1;
}

我想知道 avro 的输出是什么样的,我在这个程序中做错了什么。

【问题讨论】:

    标签: java hadoop avro word-count


    【解决方案1】:

    Avro 库的最新版本包括更新后的 example 其用于 MRv2 的 ColorCount 示例。我建议你看看它,使用与他们在 Reduce 类中使用的模式相同的模式,或者只是 extend AvroMapper。请注意,使用 Pair 类而不是 AvroKey+AvroValue 对于在 Hadoop 上运行 Avro 也是必不可少的。

    【讨论】:

    • 感谢分享示例链接。我的问题解决了,但我还有一个问题。如果我使用其他程序创建的 avro 文件作为映射器的输入,那么 avro 如何将该文件的架构分发给所有映射器?根据我的理解,如果 avro 架构是在 avro 文件的开头编写的,那么除了第一个映射器之外,该架构将不适用于所有映射器。
    • Avro Specification 是这样说的:一个文件有一个模式,并且存储在文件中的所有对象都必须根据该模式写入,使用二进制编码。对象存储在可以压缩的块中。块之间使用同步标记以允许有效地拆分文件以进行 MapReduce 处理。我的理解是,架构每个文件存储一次,但在拆分过程中,我们将架构写入每个部分,标记帮助我们高效地完成。
    • 现在我在使用 Avro1.7.4 版本时遇到了同样的问题。你能说说你是怎么解决这个问题的吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-21
    • 1970-01-01
    相关资源
    最近更新 更多