这与侧数据分布的问题有关。
辅助数据分发有两种方法。
1) 分布式缓存
2) 配置
由于您有要共享的对象,我们可以使用配置类。
这个讨论将依赖于 Configuration 类来使整个集群中的 Object 可用,所有 Mapper 和(或)Reducer 都可以访问。这里的方法很简单。配置类的 setString(String, String) 设置器用于完成此任务。必须共享的对象在驱动端被序列化为java字符串,并在Mapper或Reducer处反序列化回对象。
在下面的示例代码中,我使用了 com.google.gson.Gson 类来进行简单的序列化和反序列化。您也可以使用 Java 序列化。
代表您需要共享的对象的类
public class TestBean {
String string1;
String string2;
public TestBean(String test1, String test2) {
super();
this.string1 = test1;
this.string2 = test2;
}
public TestBean() {
this("", "");
}
public String getString1() {
return string1;
}
public void setString1(String test1) {
this.string1 = test1;
}
public String getString2() {
return string2;
}
public void setString2(String test2) {
this.string2 = test2;
}
}
您可以设置配置的主类
public class GSONTestDriver {
public static void main(String[] args) throws Exception {
System.out.println("In Main");
Configuration conf = new Configuration();
TestBean testB1 = new TestBean("Hello1","Gson1");
TestBean testB2 = new TestBean("Hello2","Gson2");
Gson gson = new Gson();
String testSerialization1 = gson.toJson(testB1);
String testSerialization2 = gson.toJson(testB2);
conf.set("instance1", testSerialization1);
conf.set("instance2", testSerialization2);
Job job = new Job(conf, " GSON Test");
job.setJarByClass(GSONTestDriver.class);
job.setMapperClass(GSONTestMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
您可以从中检索对象的映射器类
public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
Configuration conf;
String inst1;
String inst2;
public void setup(Context context) {
conf = context.getConfiguration();
inst1 = conf.get("instance1");
inst2 = conf.get("instance2");
Gson gson = new Gson();
TestBean tb1 = gson.fromJson(inst1, TestBean.class);
System.out.println(tb1.getString1());
System.out.println(tb1.getString2());
TestBean tb2 = gson.fromJson(inst2, TestBean.class);
System.out.println(tb2.getString1());
System.out.println(tb2.getString2());
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
使用 com.google.gson.Gson 类的 toJson(Object src) 方法将 bean 转换为序列化的 Json String。然后将序列化的 Json 字符串作为值通过配置实例传递,并通过 Mapper 的名称访问。使用同一 Gson 类的 fromJson(String json, Class classOfT) 方法对字符串进行反序列化。您可以放置对象,而不是我的测试 bean。