public class LabelJob
{
    
    public static void main(String[] args)
        throws Exception
    {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(LabelJob.class);
        job.setJobName("Hbase.LabelJob");
        
        Configuration conf = job.getConfiguration();
        conf.set("tablename", "product_tags");
        
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        //输入表
        TableMapReduceUtil.initTableMapperJob("tb_user", scan, LabelMapper.class, Text.class, Text.class, job);
        
        job.setReducerClass(LabelReducer.class);
        //输出表
        TableMapReduceUtil.initTableReducerJob("usertags", LabelReducer.class, job);
        job.waitForCompletion(true);
        
    }
    
}

  

public class LabelMapper extends TableMapper<Text, Text>
{
      protected void setup(Context context)
        throws IOException, InterruptedException
    {
        super.setup(context);
        String tablename = context.getConfiguration().get("tablename");
         .................
    } 
 protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
        throws IOException, InterruptedException
    {
          String userid = Bytes.toString(rowKey.get()); // 读取HBase用户表rowkey
        
          String strlabel = fhb.getStringValue(result, "labels", "label");
         String[] userLabels = strlabel.split(",");
....................
}
}

  

public class LabelReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
{
     @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException
    {
String rowKey = key.toString();// 读取Map输出
 for (Text v : values)
            {
                String tag = v.toString();
                Long count = tagMap.get(tag);
                tagMap.put(tag, (count == null) ? 1 : (count + 1));// 计数
            }
 Put put = new Put(productId.getBytes());
  put.add("prodtags".getBytes(), "prodtags".getBytes(),outputlabel.toString().getBytes());

context.write(new ImmutableBytesWritable(productId.getBytes()), put);

}

}

  

 

分类:

技术点:

相关文章: