【问题标题】:Passing objects to MapReduce from a driver从驱动程序将对象传递给 MapReduce
【发布时间】:2016-04-04 20:02:06
【问题描述】:

我创建了一个驱动程序,它读取配置文件,构建对象列表(基于配置)并将该列表传递给 MapReduce(MapReduce 有一个静态属性,该属性包含对该对象列表的引用)。

它有效,但仅限于本地。一旦我在集群配置上运行该作业,我就会收到各种错误,表明该列表尚未构建。这让我觉得我做错了,并且在集群设置上 MapReduce 独立于驱动程序运行。

我的问题是如何正确初始化 Mapper。

(我使用的是 Hadoop 2.4.1)

【问题讨论】:

    标签: hadoop mapreduce


    【解决方案1】:

    这与侧数据分布的问题有关。

    辅助数据分发有两种方法。

    1) 分布式缓存

    2) 配置

    由于您有要共享的对象,我们可以使用配置类。

    这个讨论将依赖于 Configuration 类来使整个集群中的 Object 可用,所有 Mapper 和(或)Reducer 都可以访问。这里的方法很简单。配置类的 setString(String, String) 设置器用于完成此任务。必须共享的对象在驱动端被序列化为java字符串,并在Mapper或Reducer处反序列化回对象。

    在下面的示例代码中,我使用了 com.google.gson.Gson 类来进行简单的序列化和反序列化。您也可以使用 Java 序列化。

    代表您需要共享的对象的类

     public class TestBean {
        String string1;
        String string2;
        public TestBean(String test1, String test2) {
            super();
            this.string1 = test1;
            this.string2 = test2;
        }
        public TestBean() {
            this("", "");
        }
        public String getString1() {
            return string1;
        }
        public void setString1(String test1) {
            this.string1 = test1;
        }
        public String getString2() {
            return string2;
        }
        public void setString2(String test2) {
            this.string2 = test2;
        }
    }
    

    您可以设置配置的主类

    public class GSONTestDriver {
        public static void main(String[] args) throws Exception {
            System.out.println("In Main");
            Configuration conf = new Configuration();
            TestBean testB1 = new TestBean("Hello1","Gson1");
            TestBean testB2 = new TestBean("Hello2","Gson2");
            Gson gson = new Gson();
            String testSerialization1 = gson.toJson(testB1);
            String testSerialization2 = gson.toJson(testB2);
            conf.set("instance1", testSerialization1);
            conf.set("instance2", testSerialization2);
            Job job = new Job(conf, " GSON Test");
            job.setJarByClass(GSONTestDriver.class);
            job.setMapperClass(GSONTestMapper.class);
            job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.waitForCompletion(true);
        }
    }
    

    您可以从中检索对象的映射器类

    public class GSONTestMapper extends
    Mapper<LongWritable, Text, Text, NullWritable> {
        Configuration conf;
        String inst1;
        String inst2;
        public void setup(Context context) {
            conf = context.getConfiguration();
            inst1 = conf.get("instance1");
            inst2 = conf.get("instance2");
            Gson gson = new Gson();
            TestBean tb1 = gson.fromJson(inst1, TestBean.class);
            System.out.println(tb1.getString1());
            System.out.println(tb1.getString2());
            TestBean tb2 = gson.fromJson(inst2, TestBean.class);
            System.out.println(tb2.getString1());
            System.out.println(tb2.getString2());
        } 
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value,NullWritable.get());
        }
    }
    

    使用 com.google.gson.Gson 类的 toJson(Object src) 方法将 bean 转换为序列化的 Json String。然后将序列化的 Json 字符串作为值通过配置实例传递,并通过 Mapper 的名称访问。使用同一 Gson 类的 fromJson(String json, Class classOfT) 方法对字符串进行反序列化。您可以放置​​对象,而不是我的测试 bean。

    【讨论】:

    • 感谢 Arun 的详细回答。我觉得它很有用。
    • @LukaszKujawa :我见过很多人要求这个。所以想把这个放在这里。 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多