【问题标题】:Two mappers with Multiple data sources具有多个数据源的两个映射器
【发布时间】:2016-04-18 15:48:45
【问题描述】:

我写了 2 个映射器 Map1 和 Map2

Map1- 在 HDFS 中读取一个 seq 文件并对其进行处理。

Map2- 从 HBASE 读取并生成与 Map1 相同的键值对。

最后我将它们合并到 ReducerAll 中。

问题是只有一个映射器正在运行,并且作业完成而没有任何类型的错误。只有最后一个映射器正在运行(即TableMapReduceUtil)。如果我交换行TableMapReduceUtilMultipleInputs,那么最后一个即MultipleInputs 映射器运行。

我在这里做错了什么?这两种情况都不会引发错误。我还使用 addCacheFile() 读取了 2 个文件进行处理,但我猜这并不重要。

Job job3 = Job.getInstance(config, "Test");
if (true) {


  job3.setJarByClass(Main.class);


  job3.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job3.setMapOutputValueClass(ImmutableBytesWritable.class);
  job3.setOutputKeyClass(ImmutableBytesWritable.class);
  job3.setOutputValueClass(ImmutableBytesWritable.class);


  job3.getConfiguration().set("StartDate", c_startDate);
  job3.getConfiguration().set("EndDate", c_endDate);


  job3.addCacheFile(new URI(args[8]));
  job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1));

  job3.addCacheFile(new URI(args[9]));
  job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1));
  job3.setReducerClass(ReducerAll.class);
  job3.setOutputFormatClass(SequenceFileOutputFormat.class);

  job3.setNumReduceTasks(10);

  Scan scan = new Scan();
  scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable"));
  scan.setCaching(300);
  scan.setCacheBlocks(false);

  MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
  TableMapReduceUtil.initTableMapperJob(
          "hbasetable",
          scan,
          Map2.class,
          ImmutableBytesWritable.class,
          ImmutableBytesWritable.class,
          job3);


  FileOutputFormat.setOutputPath(job3, new Path(args[7]));
  job3.waitForCompletion(true);
  if (!job3.waitForCompletion(true)) {
    return (1);
  }

【问题讨论】:

    标签: java hadoop mapreduce hbase mapper


    【解决方案1】:

    我相信这种行为是因为以下两行:-

    MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
     TableMapReduceUtil.initTableMapperJob(
          "hbasetable",
          scan,
          Map2.class,
          ImmutableBytesWritable.class,
          ImmutableBytesWritable.class,
          job3); 
    
    1. 只有一份工作job3
    2. 虽然您提到有两个映射器,但请查看映射器类型。 Map1 中的映射器与Map2不同Map1MapperMap2TableMapper
    3. 将这两个语句放在一起并不意味着它们本质上被合并到 job3 的 MultipleInputs setup 中。 MultipleInputs 仍然只有Map1 的一个设置。 Map2 的其他设置仍然是独立的。
    4. 现在执行。两种配置 MultipleInputs 或 TableMapReduceUtil 中的后者覆盖 job3 中的前一个配置,因此只有一个映射器执行

    PS:- 如果这不正确,请告诉我,我尚未验证我在我的机器上提出的理解。

    【讨论】:

    • 啊!我现在知道了。那么,如果我想同时运行两个映射器并将其定向到同一个映射器,我应该定义另一个作业吗?但是,那我怎么能把它交给同一个减速器呢?
    • 指向同一个reducer*
    • @gambit 我认为我们可以使用三个工作。前两个作业只有映射器,第三个作业(合并步骤)具有一个普通的映射器,实际上将输入写入输出“原样”,然后是一个实际上对这些文件执行合并任务的减速器。我仍在尝试想办法以更好的方式做到这一点(这会带来更好的性能)。请让我知道这可不可以帮你。如果我发现更好的东西会发布。
    • stackoverflow.com/questions/17456369/… 答案应该是有效的,并且 OP 使用了相同的工作。该评论有一个用户面临我的问题。对此有什么想法吗?
    • 我的错。你的情况不同。有那么一刻,他的解决方案似乎也可以用于这种情况。删除旧评论以避免任何混淆。
    猜你喜欢
    • 2021-10-28
    • 1970-01-01
    • 1970-01-01
    • 2013-03-20
    • 1970-01-01
    • 2017-10-25
    • 2016-01-04
    • 2020-10-05
    • 1970-01-01
    相关资源
    最近更新 更多