hbase数据导入:
参考http://blog.csdn.net/hua840812/article/details/7414875,在把代码copy下来后,发现运行总是报错:
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;
原因是map的输出必须按照现有的版本来写,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>
要这样写,不能简单的写成extends Mapper,
代码还是贴出来:
生成hfile的代码:
package com.bonc.db; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TestHFileToHBase { public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\\|"); ImmutableBytesWritable rowkey = new ImmutableBytesWritable( values[0].toString().trim().getBytes()); KeyValue kvProtocol; if (values.length>1){ kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim() .getBytes()); }else{ kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes()); } context.write(rowkey, kvProtocol); // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(), // "SRCIP".getBytes(), values[1].getBytes()); // context.write(k, kvSrcip); // HFileOutputFormat.getRecordWriter } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "TestHFileToHBase"); job.setJarByClass(TestHFileToHBase.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(TestHFileToHBaseMapper.class); job.setReducerClass(KeyValueSortReducer.class); // job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); // job.setNumReduceTasks(4); // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class); // HBaseAdmin admin = new HBaseAdmin(conf); HTable table = new HTable(conf, "url_rule"); HFileOutputFormat.configureIncrementalLoad(job, table); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
hfile导入到表的代码:
1 package com.bonc.db; 2 import java.io.IOException; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 9 import org.apache.hadoop.hbase.util.Bytes; 10 11 public class TestLoadIncrementalHFileToHBase { 12 13 // private static final byte[] TABLE = Bytes.toBytes("hua"); 14 // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID"); 15 // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID"); 16 17 public static void main(String[] args) throws IOException { 18 Configuration conf = HBaseConfiguration.create(); 19 // byte[] TABLE = Bytes.toBytes("hua"); 20 byte[] TABLE = Bytes.toBytes(args[0]); 21 HTable table = new HTable(TABLE); 22 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 23 loader.doBulkLoad(new Path(args[1]), table); 24 // loader.doBulkLoad(new Path("/hua/testHFileResult/"), table); 25 } 26 27 }