【发布时间】:2016-04-18 15:48:45
【问题描述】:
我写了 2 个映射器 Map1 和 Map2
Map1- 在 HDFS 中读取一个 seq 文件并对其进行处理。
Map2- 从 HBASE 读取并生成与 Map1 相同的键值对。
最后我将它们合并到 ReducerAll 中。
问题是只有一个映射器正在运行,并且作业完成而没有任何类型的错误。只有最后一个映射器正在运行(即TableMapReduceUtil)。如果我交换行TableMapReduceUtil 和MultipleInputs,那么最后一个即MultipleInputs 映射器运行。
我在这里做错了什么?这两种情况都不会引发错误。我还使用 addCacheFile() 读取了 2 个文件进行处理,但我猜这并不重要。
Job job3 = Job.getInstance(config, "Test");
if (true) {
job3.setJarByClass(Main.class);
job3.setMapOutputKeyClass(ImmutableBytesWritable.class);
job3.setMapOutputValueClass(ImmutableBytesWritable.class);
job3.setOutputKeyClass(ImmutableBytesWritable.class);
job3.setOutputValueClass(ImmutableBytesWritable.class);
job3.getConfiguration().set("StartDate", c_startDate);
job3.getConfiguration().set("EndDate", c_endDate);
job3.addCacheFile(new URI(args[8]));
job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1));
job3.addCacheFile(new URI(args[9]));
job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1));
job3.setReducerClass(ReducerAll.class);
job3.setOutputFormatClass(SequenceFileOutputFormat.class);
job3.setNumReduceTasks(10);
Scan scan = new Scan();
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable"));
scan.setCaching(300);
scan.setCacheBlocks(false);
MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
TableMapReduceUtil.initTableMapperJob(
"hbasetable",
scan,
Map2.class,
ImmutableBytesWritable.class,
ImmutableBytesWritable.class,
job3);
FileOutputFormat.setOutputPath(job3, new Path(args[7]));
job3.waitForCompletion(true);
if (!job3.waitForCompletion(true)) {
return (1);
}
【问题讨论】:
标签: java hadoop mapreduce hbase mapper