【发布时间】:2015-06-18 10:43:04
【问题描述】:
我编写了一个 map reduce 作业来从文件中读取数据并将其插入到 Hbase 表中。但我面临的问题是只有 1 条记录被插入到 Hbase 表中。我不确定这是最后一条记录还是任何随机记录,因为我的输入文件大约为 10Gb。我写的逻辑,我确定应该在表中插入数千条记录。我只分享reducer代码和Driver类代码,我很确定,问题出在这里。请在下面找到代码:
public static class Reduce extends TableReducer<Text,Text,ImmutableBytesWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<Text> uniques = new HashSet<Text>();
String vis=key.toString();
String[] arr=vis.split(":");
Put put=null;
for (Text val : values){
if (uniques.add(val)) {
put = new Put(arr[0].getBytes());
put.add(Bytes.toBytes("cf"), Bytes.toBytes("column"),Bytes.toBytes(val.toString()));
}
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
}
}
}
我的驱动类:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "Blank");
job.setJarByClass(Class_name.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(CompositeKeyComprator.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
job.setReducerClass(Reduce.class);
TableMapReduceUtil.initTableReducerJob(
"Table_name",
Reduce.class,
job);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
在控制台运行程序后,显示Reduce输出记录=73579,但表中只插入了1条记录。
15/06/19 16:32:41 INFO mapred.JobClient: Job complete: job_201506181703_0020
15/06/19 16:32:41 INFO mapred.JobClient: Counters: 28
15/06/19 16:32:41 INFO mapred.JobClient: Map-Reduce Framework
15/06/19 16:32:41 INFO mapred.JobClient: Spilled Records=147158
15/06/19 16:32:41 INFO mapred.JobClient: Map output materialized bytes=6941462
15/06/19 16:32:41 INFO mapred.JobClient: Reduce input records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Virtual memory (bytes) snapshot=7614308352
15/06/19 16:32:41 INFO mapred.JobClient: Map input records=140543
15/06/19 16:32:41 INFO mapred.JobClient: SPLIT_RAW_BYTES=417
15/06/19 16:32:41 INFO mapred.JobClient: Map output bytes=6794286
15/06/19 16:32:41 INFO mapred.JobClient: Reduce shuffle bytes=6941462
15/06/19 16:32:41 INFO mapred.JobClient: Physical memory (bytes) snapshot=892702720
15/06/19 16:32:41 INFO mapred.JobClient: Reduce input groups=1
15/06/19 16:32:41 INFO mapred.JobClient: Combine output records=0
15/06/19 16:32:41 INFO mapred.JobClient: Reduce output records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Map output records=73579
15/06/19 16:32:41 INFO mapred.JobClient: Combine input records=0
15/06/19 16:32:41 INFO mapred.JobClient: CPU time spent (ms)=10970
15/06/19 16:32:41 INFO mapred.JobClient: Total committed heap usage (bytes)=829947904
15/06/19 16:32:41 INFO mapred.JobClient: File Input Format Counters
15/06/19 16:32:41 INFO mapred.JobClient: Bytes Read=204120920
15/06/19 16:32:41 INFO mapred.JobClient: FileSystemCounters
15/06/19 16:32:41 INFO mapred.JobClient: HDFS_BYTES_READ=204121337
15/06/19 16:32:41 INFO mapred.JobClient: FILE_BYTES_WRITTEN=14198205
15/06/19 16:32:41 INFO mapred.JobClient: FILE_BYTES_READ=6941450
15/06/19 16:32:41 INFO mapred.JobClient: Job Counters
当我将减速器输出写入文件时,我得到了正确的输出。但不在 Hbase 表中。 请让我知道我在这里缺少什么。提前致谢。
【问题讨论】:
标签: java for-loop mapreduce hbase