【问题标题】:How to write multiple outputs of different formats in a hadoop reducer?如何在 hadoop reducer 中编写不同格式的多个输出?
【发布时间】:2012-11-14 12:26:35
【问题描述】:

如何使用 reducer 中的 MultipleOutputs 类来编写多个输出,每个输出都可以有自己独特的配置? MultipleOutputs javadoc 中有一些文档,但似乎仅限于文本输出。事实证明,MultipleOutputs 可以处理每个输出的输出路径、键类和值类,但尝试使用需要使用其他配置属性的输出格式会失败。

(这个问题已经出现了好几次,但我试图回答它的尝试都被挫败了,因为提问者实际上有一个不同的问题。由于这个问题已经花费了我几天多的调查时间才能回答,所以我正在回答this Meta Stack Overflow question 建议的我自己的问题。)

【问题讨论】:

    标签: hadoop mapreduce cassandra reduce


    【解决方案1】:

    我已经实现了对 Cassandra 的 MultipleOutputs 支持(请参阅 this JIRA ticket,它目前计划在 1.2 中发布。如果您现在需要它,可以在票证中应用补丁。也可以查看 this presentation 的主题其中给出了其用法的示例。

    【讨论】:

    • 酷。您能否详细说明如何让多个 Cassandra 输出(RecordWriters 等)在配置和缩减时间期间不互相干扰?
    • 其实很简单。您唯一需要做的就是将列族的配置键更改为 MultipleOutputs 用来确定其写入位置的相同键,然后允许用户通过指定多个输出或调用 setColumnFamily 来运行作业。 Hadoop 和 Cassandra 中的现有代码从那里正常运行。
    【解决方案2】:

    我浏览了 MultipleOutputs 实现,发现它不支持任何具有 outputDir、键类和值类以外的属性的 OutputFormatType。我尝试编写自己的 MultipleOutputs 类,但失败了,因为它需要在 Hadoop 类的某个地方调用私有方法。

    我只剩下一种解决方法,它似乎适用于所有情况以及输出格式和配置的所有组合:编写我想要使用的 OutputFormat 类的子类(这些结果是可重用的)。这些类了解其他 OutputFormats 正在同时使用,并且知道如何存储它们的属性。该设计利用了这样一个事实,即可以在要求其 RecordWriter 之前使用上下文配置 OutputFormat。

    我已经将它与 Cassandra 的 ColumnFamilyOutputFormat 一起使用:

    package com.myorg.hadoop.platform;
    
    import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
    import org.apache.hadoop.conf.Configurable;
    import org.apache.hadoop.conf.Configuration;
    
    public abstract class ConcurrentColumnFamilyOutputFormat 
                            extends ColumnFamilyOutputFormat 
                            implements Configurable {
    
    private static String[] propertyName = {
            "cassandra.output.keyspace" ,
            "cassandra.output.keyspace.username" ,
            "cassandra.output.keyspace.passwd" ,
            "cassandra.output.columnfamily" ,
            "cassandra.output.predicate",
            "cassandra.output.thrift.port" ,
            "cassandra.output.thrift.address" ,
            "cassandra.output.partitioner.class"
            };
    
    private Configuration configuration;
    
    public ConcurrentColumnFamilyOutputFormat() {
        super();
    }
    
    public Configuration getConf() {
        return configuration;
    }
    
    public void setConf(Configuration conf) {
    
        configuration = conf;
    
        String prefix = "multiple.outputs." + getMultiOutputName() + ".";
    
        for (int i = 0; i < propertyName.length; i++) {
            String property = prefix + propertyName[i];
            String value = conf.get(property);
            if (value != null) {
                conf.set(propertyName[i], value);
            }
        }
    
    }
    
    public void configure(Configuration conf) {
    
        String prefix = "multiple.outputs." + getMultiOutputName() + ".";
    
        for (int i = 0; i < propertyName.length; i++) {
            String property = prefix + propertyName[i];
            String value = conf.get(propertyName[i]);
            if (value != null) {
                conf.set(property, value);
            }
        }
    
    }
    
    public abstract String getMultiOutputName();
    

    }

    对于您想要用于 reducer 的每个 Cassandra(在本例中)输出,您将拥有一个类:

    package com.myorg.multioutput.ReadCrawled;
    
    import com.myorg.hadoop.platform.ConcurrentColumnFamilyOutputFormat;
    
    public class StrongOutputFormat extends ConcurrentColumnFamilyOutputFormat {
    
        public StrongOutputFormat() {
            super();
        }
    
        @Override
        public String getMultiOutputName() {
            return "Strong";
        }
    
    }
    

    然后你可以在你的 mapper/reducer 配置类中配置它:

        // This is how you'd normally configure the ColumnFamilyOutputFormat
    
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "Partner", "Strong");
    ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setOutputPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
    
        // This is how you tell the MultipleOutput-aware OutputFormat that
        // it's time to save off the configuration so no other OutputFormat
        // steps all over it.
    
    new StrongOutputFormat().configure(job.getConfiguration());
    
        // This is where we add the MultipleOutput-aware ColumnFamilyOutputFormat
        // to out set of outputs
    
    MultipleOutputs.addNamedOutput(job, "Strong", StrongOutputFormat.class, ByteBuffer.class, List.class);
    

    再举一个例子,FileOutputFormat 的 MultipleOutput 子类使用以下属性:

        private static String[] propertyName = {
            "mapred.output.compression.type" ,
            "mapred.output.compression.codec" ,
            "mapred.output.compress" ,
            "mapred.output.dir"
            };
    

    并且会像上面的ConcurrentColumnFamilyOutputFormat 一样实现,只是它会使用上面的属性。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-03-04
      • 1970-01-01
      • 1970-01-01
      • 2014-03-19
      • 1970-01-01
      • 2013-01-27
      • 2016-09-22
      • 2014-06-04
      相关资源
      最近更新 更多