【问题标题】:Hadoop - constructor args for mapperHadoop - 映射器的构造函数参数
【发布时间】:2011-12-29 19:23:18
【问题描述】:

有没有办法将构造函数参数提供给 Hadoop 中的映射器?可能通过一些包含 Job 创建的库?

这是我的场景:

public class HadoopTest {

    // Extractor turns a line into a "feature"
    public static interface Extractor {
        public String extract(String s);
    }

    // A concrete Extractor, configurable with a constructor parameter
    public static class PrefixExtractor implements Extractor {
        private int endIndex;

        public PrefixExtractor(int endIndex) { this.endIndex = endIndex; }

        public String extract(String s) { return s.substring(0, this.endIndex); }
    }

    public static class Map extends Mapper<Object, Text, Text, Text> {
        private Extractor extractor;

        // Constructor configures the extractor
        public Map(Extractor extractor) { this.extractor = extractor; }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String feature = extractor.extract(value.toString());
            context.write(new Text(feature), new Text(value.toString()));
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text val : values) context.write(key, val);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "test");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

应该清楚,由于 Mapper 仅作为类引用 (Map.class) 提供给 Configuration,因此 Hadoop 无法传递构造函数参数并配置特定的 Extractor。

有 Hadoop 包装框架,例如 Scoobi、Crunch、 Scrunch(可能还有更多我不知道的)似乎具有这种能力,但我不知道它们是如何实现的。 编辑: 在与 Scoobi 合作了一些之后,我发现我在这方面有部分错误。如果您在“映射器”中使用外部定义的对象,Scoobi 要求它是可序列化的,如果不是,则会在运行时报错。所以也许正确的方法就是让我的Extractor可序列化并在Mapper的设置方法中反序列化它......

另外,我实际上在 Scala 工作,因此绝对欢迎基于 Scala 的解决方案(如果不鼓励的话!)

【问题讨论】:

    标签: java scala hadoop


    【解决方案1】:

    我建议通过您正在创建的 Configuration 对象告诉您的映射器使用哪个提取器。映射器在其setup 方法(context.getConfiguration())中接收配置。似乎您不能将对象放入配置中,因为它通常是从 XML 文件或命令行构造的,但您可以设置一个枚举值并让映射器自己构造其提取器。在创建映射器后对其进行自定义不是很漂亮,但这就是我解释 API 的方式。

    【讨论】:

    • +1。这基本上就是我的团队的做法。 (我们使用旧的 API,所以对我们来说是 MapReduceBase.configure(JobConf),但新 API 的 Mapper.setup(Context) 是同样的想法。)
    • 是的,虽然这可行,但它需要预先加载 Mapper 及其可能需要的所有可能配置。由于这种策略会使应用程序非常不灵活,我想避免它。
    • 确实,如果你想参数化提取策略,这种模式并不漂亮——你必须做一个switch(configuration.get("strategy")) { }或类似的东西,并提前知道所有可能的类。但是,我喜欢将映射器视为带有一组有限选项的命令行实用程序。您可以根据需要设计提取器类层次结构,并使映射器成为它的包装器。 (但是,该配置非常适合传递诸如“prefixLength=3”之类的参数,而不是像您的答案那样通过子类化来实现。)
    • 对,这里的例子有点琐碎,配置将是一个简单的解决方案。但是在我的实际应用程序中,我希望能够编写任意复杂的Extractor 各种形状和大小的类,以在不同的场景中运行实验。如果我必须为Extractor 可能性的每个组合设置一个枚举,那么事情很快就会失控。
    【解决方案2】:

    在提交作业时设置实现类名

    Configuration conf = new Configuration();
    conf.set("PrefixExtractorClass", "com.my.class.ThreePrefixExtractor");
    

    或使用命令行中的-D option 来设置 PrefixExtractorClass 选项。

    下面是映射器中的实现

    Extractor extractor = null;
    protected void setup(Context context) throws IOException,
                InterruptedException
    {
        try {
            Configuration conf = context.getConfiguration();
            String className = conf.get("PrefixExtractorClass");
            extractor = Class.forName(className);
        } Catch (ClassNotFoundException e) {
            //handle the exception
        }
    }
    

    现在根据需要在地图函数中使用extractor 对象。

    • 包含com.my.class.ThreePrefixExtractor 类的jar 应该分发到所有节点。这是来自 Cloudera 的 article,介绍了执行此操作的不同方式。

    • 在上面的示例中,com.my.class.ThreePrefixExtractor 应该扩展 Extractor 类。

    使用这种方法可以使映射器实现通用化。这是大多数框架采用的方法(使用 Class.forName)来拥有实现特定接口的可插入组件。

    【讨论】:

    • 这并不能解决问题;它只会移动它。我想通过允许在运行时注入一个 Extractor 实例来避免每个潜在的 Extractor 实现都有一个单独的类。
    • 不应该是Class.forName(...).getInstance()吗?
    【解决方案3】:

    我仍在寻找好的答案,但我想出的一个(非理想)解决方案是使用继承而不是组合,将 Map 转换为 Extractor 抽象类。然后它可以一直被子类化以合并所有构造函数参数(如下所示)。

        public static abstract class Extractor extends Mapper<Object, Text, Text, Text> {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String feature = extract(value.toString());
                context.write(new Text(feature), new Text(value.toString()));
            }
    
            public abstract String extract(String s);
        }
    
        public static abstract class PrefixExtractor extends Extractor {
            public String extract(String s) { return s.substring(0, getEndIndex()); }
    
            public abstract int getEndIndex();
        }
    
        public static class ThreePrefixExtractor extends PrefixExtractor {
            public int getEndIndex() { return 3; }
        }
    

    但是,这并不是那么好,我真的觉得必须有一种方法可以正确地做到这一点。

    (我将这个问题从原来的问题中移出,以使事情变得不那么混乱。)

    【讨论】:

      【解决方案4】:

      到目前为止,我想出的最佳解决方案是将我想要的对象的序列化版本传递给 Mapper,并在运行时使用反射构造对象。

      所以,main 方法会这样说:

      conf.set("ExtractorConstructor", "dicta03.hw4.PrefixExtractor(3)");
      

      然后,在 Mapper 中,我们使用辅助函数 construct(定义如下)并可以说:

      public void setup(Context context) {
          try {
              String constructor = context.getConfiguration().get("ExtractorConstructor");
              this.extractor = (Extractor) construct(constructor);
          } catch (Exception e) {
              throw new RuntimeException(e);
          }
      }
      

      construct 的定义,它使用反射在运行时从字符串递归构造对象:

      public static Object construct(String s) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
          if (s.matches("^[A-Za-z0-9.#]+\\(.*\\)$")) {
              Class cls = null;
              List<Object> argList = new ArrayList<Object>();
              int parenCount = 0;
              boolean quoted = false;
              boolean escaped = false;
              int argStart = -1;
              for (int i = 0; i < s.length(); i++) {
                  if (escaped) {
                      escaped = false;
                  } else if (s.charAt(i) == '\\') {
                      escaped = true;
                  } else if (s.charAt(i) == '"') {
                      quoted = true;
                  } else if (!quoted) {
                      if (s.charAt(i) == '(') {
                          if (cls == null)
                              cls = Class.forName(s.substring(0, i));
                          parenCount++;
                          argStart = i + 1;
                      } else if (s.charAt(i) == ')') {
                          if (parenCount == 1)
                              argList.add(construct(s.substring(argStart, i)));
                          parenCount--;
                      } else if (s.charAt(i) == ',') {
                          if (parenCount == 1) {
                              argList.add(construct(s.substring(argStart, i)));
                              argStart = i + 1;
                          }
                      }
                  }
              }
      
              Object[] args = new Object[argList.size()];
              Class[] argTypes = new Class[argList.size()];
              for (int i = 0; i < argList.size(); i++) {
                  argTypes[i] = argList.get(i).getClass();
                  args[i] = argList.get(i);
              }
              Constructor constructor = cls.getConstructor(argTypes);
              return constructor.newInstance(args);
          } else if (s.matches("^\".*\"$")) {
              return s.substring(1, s.length() - 1);
          } else if (s.matches("^\\d+$")) {
              return Integer.parseInt(s);
          } else {
              throw new RuntimeException("Cannot construct " + s);
          }
      }
      

      (这可能不是最强大的解析器,但它可以很容易地扩展到涵盖更多类型的对象。)

      【讨论】:

      • @dhj - 除了您向要实例化的类发送其他参数之外,它与我在回复中提到的解决方案是否或多或少相同?
      • 是的。起初,您的想法似乎太有限,并且必须有一个更通用的解决方案。所以我想了一会儿,想出了这个。然后回头看看你的,我发现我基本上已经提出了你提出的扩展模型。
      • @dhj - pass a serialized version of the object - 我没有看到任何 ser/deser - 发送了一个类名和一些额外的参数,并使用 Class#forName 创建了一个类。
      【解决方案5】:

      有关另一个类似的解决方案,请查看:

      https://github.com/NICTA/scoobi/blob/master/src/main/scala/com/nicta/scoobi/impl/rtt/ClassBuilder.scala

      我们如何做到这一点。它使用反射来构建一些 java 源代码,在运行时会创建一个相同的对象图。然后我们编译该源代码(使用 javassist)并包含在发送到集群的 jar 中。

      如果你想把它顶起来,它非常健壮,它可以处理循环对象图和所有特殊情况(有相当多的情况)之类的东西。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-10-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-01-26
        • 2023-03-15
        相关资源
        最近更新 更多