hbase数据导入:

参考http://blog.csdn.net/hua840812/article/details/7414875,在把代码copy下来后,发现运行总是报错:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;

原因是map的输出必须按照现有的版本来写,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>

要这样写,不能简单的写成extends Mapper,

代码还是贴出来:

生成hfile的代码:

package com.bonc.db;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

    public static class TestHFileToHBaseMapper  extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\\|");
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                    values[0].toString().trim().getBytes());
            KeyValue kvProtocol;
            if (values.length>1){
             kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim()
                    .getBytes());
            }else{
                kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes());
            }
            context.write(rowkey, kvProtocol);
            // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(),
            // "SRCIP".getBytes(), values[1].getBytes());
            // context.write(k, kvSrcip);
//             HFileOutputFormat.getRecordWriter 
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conf, "TestHFileToHBase");
        job.setJarByClass(TestHFileToHBase.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);

        job.setMapperClass(TestHFileToHBaseMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);
//        job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);
        // job.setNumReduceTasks(4);
        // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

        // HBaseAdmin admin = new HBaseAdmin(conf);
        HTable table = new HTable(conf, "url_rule");

        HFileOutputFormat.configureIncrementalLoad(job, table);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

hfile导入到表的代码:

 1 package com.bonc.db;
 2 import java.io.IOException;
 3 
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.HTable;  
 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  
 9 import org.apache.hadoop.hbase.util.Bytes;  
10   
11 public class TestLoadIncrementalHFileToHBase {  
12   
13     // private static final byte[] TABLE = Bytes.toBytes("hua");  
14     // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID");  
15     // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID");  
16   
17     public static void main(String[] args) throws IOException {  
18         Configuration conf = HBaseConfiguration.create();  
19 //      byte[] TABLE = Bytes.toBytes("hua");  
20         byte[] TABLE = Bytes.toBytes(args[0]);  
21         HTable table = new HTable(TABLE);  
22         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
23         loader.doBulkLoad(new Path(args[1]), table);  
24 //      loader.doBulkLoad(new Path("/hua/testHFileResult/"), table);  
25     }  
26   
27 }  
View Code

相关文章:

  • 2021-12-04
  • 2022-12-23
  • 2022-01-01
  • 2021-12-04
  • 2021-11-27
  • 2022-02-09
  • 2021-12-17
  • 2022-12-23
猜你喜欢
  • 2021-12-04
  • 2021-12-04
  • 2021-07-17
  • 2022-12-23
  • 2021-07-13
  • 2022-12-23
  • 2021-12-04
相关资源
相似解决方案