【问题标题】:MapReduce job with mixed data sources: HBase table and HDFS files具有混合数据源的 MapReduce 作业:HBase 表和 HDFS 文件
【发布时间】:2013-07-01 15:18:10
【问题描述】:

我需要实现一个 MR 作业,它可以访问 HBase 表和 HDFS 文件中的数据。例如,mapper 从 HBase 表和 HDFS 文件中读取数据,这些数据共享相同的主键但具有不同的模式。然后 reducer 将所有列(来自 HBase 表和 HDFS 文件)连接在一起。

我尝试在线查找,但找不到使用这种混合数据源运行 MR 作业的方法。 MultipleInputs 似乎只适用于多个 HDFS 数据源。如果您有任何想法,请告诉我。示例代码会很棒。

【问题讨论】:

    标签: mapreduce hbase hdfs


    【解决方案1】:

    经过几天的调查(并从 HBase 用户邮件列表中获得帮助),我终于想出了如何去做。以下是源代码:

    public class MixMR {
    
    public static class Map extends Mapper<Object, Text, Text, Text> {
    
        public void map(Object key, Text value, Context context) throws IOException,   InterruptedException {
            String s = value.toString();
            String[] sa = s.split(",");
            if (sa.length == 2) {
                context.write(new Text(sa[0]), new Text(sa[1]));
            }
    
        }
    
    }
    
    public static class TableMap extends TableMapper<Text, Text>  {
        public static final byte[] CF = "cf".getBytes();
        public static final byte[] ATTR1 = "c1".getBytes();
    
        public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    
            String key = Bytes.toString(row.get());
            String val = new String(value.getValue(CF, ATTR1));
    
            context.write(new Text(key), new Text(val));
        }
    }
    
    
    public static class Reduce extends Reducer  <Object, Text, Object, Text> {
        public void reduce(Object key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String ks = key.toString();
            for (Text val : values){
                context.write(new Text(ks), val);
            }
    
        }
    }
    
    public static void main(String[] args) throws Exception {
    Path inputPath1 = new Path(args[0]);
        Path inputPath2 = new Path(args[1]);
        Path outputPath = new Path(args[2]);
    
        String tableName = "test";
    
        Configuration config = HBaseConfiguration.create();
        Job job = new Job(config, "ExampleRead");
        job.setJarByClass(MixMR.class);     // class that contains mapper
    
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        scan.addFamily(Bytes.toBytes("cf"));
    
        TableMapReduceUtil.initTableMapperJob(
                tableName,        // input HBase table name
                  scan,             // Scan instance to control CF and attribute selection
                  TableMap.class,   // mapper
                  Text.class,             // mapper output key
                  Text.class,             // mapper output value
                  job);
    
    
        job.setReducerClass(Reduce.class);    // reducer class
        job.setOutputFormatClass(TextOutputFormat.class);   
    
    
        // inputPath1 here has no effect for HBase table
        MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
        MultipleInputs.addInputPath(job, inputPath2,  TableInputFormat.class, TableMap.class);
    
        FileOutputFormat.setOutputPath(job, outputPath); 
    
        job.waitForCompletion(true);
    }
    

    }

    【讨论】:

    • 但是这不起作用..你能帮忙吗。我的工作是一样的,但它只处理2之间的最后一个映射器。请帮助
    【解决方案2】:

    没有支持此功能的 OOTB 功能。一种可能的解决方法是扫描您的 HBase 表并首先将结果写入 HDFS 文件,然后使用 MultipleInputs 执行 reduce-side join。但这会产生一些额外的 I/O 开销。

    【讨论】:

      【解决方案3】:

      pig 脚本或 hive 查询可以轻松做到这一点。

      示例猪脚本

      tbl = LOAD 'hbase://SampleTable'
             USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
             'info:* ...', '-loadKey true -limit 5')
             AS (id:bytearray, info_map:map[],...);
      
      fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);
      
      Joined = JOIN A tbl by id,fle by id;
      STORE Joined to ...
      

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-05-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-12-21
      • 1970-01-01
      相关资源
      最近更新 更多