【发布时间】:2016-01-29 11:00:00
【问题描述】:
我有两个数据集。两者都在下面给出
第一个数据集
1 A
2 B
3 C
4 D
5 E
第二个数据集
1 ALPHA
2 BRAVO
3 CHARLIE
4 DELTA
5 ECHO
我想使用 reduce side join 加入这个数据集
最终数据应该是这样的
A ALPHA
B BRAVO
C CHARLIE
D DELTA
E ECHO
我编写了以下代码
Mapper(从第一个数据集中提取数据)
public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
}
}
Mapper(从第二个数据集中提取数据)
public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
}
}
Reducer(根据需要加入数据)
public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
private String output1=new String();
private String output2=new String();
private TreeMap<String,String> x1=new TreeMap<String,String>();
private String tokens[];
public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
cleanup(context);
}
public void cleanup(Context context)throws IOException,InterruptedException{
for(Entry y:x1.entrySet())
{
context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
}
}
}
在驱动程序类中包含以下行
MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);
我得到了如下所示的输出,这根本不是我们想要的。
1 m1 A
1 m2 ALPHA
2 m2 BRAVO
2 m1 B
3 m1 C
3 m2 CHARLIE
4 m2 DELTA
4 m1 D
5 m1 E
5 m2 ECHO
尽管我没有将索引包含在 context.write() 中,但我绝对无法弄清楚为什么要打印索引
我什至使用了 cleanup(),仍然得到相同的结果。
请建议如何获得所需的结果,如上所示。
最衷心感谢让我摆脱困境的人:)
经过几次修改后,我得到了这个输出
m1 E
m1 D
m1 C
m1 B
m1 A
m2 ECHO
m2 DELTA
m2 CHARLIE
m2 BRAVO
m2 ALPHA
【问题讨论】:
-
认为这应该是您的映射器输出
-
不,实际上这不是减速器的输出。问题是,我无法将减速器发出的值对齐到一行中。后来我得到了一些输出,比如我要更新到问题区域的输出。
-
但是 m1 或 m2 不是你的树形图的一部分,对吧?
-
对吗?他们仍然进入输出。我试图通过进一步的分析来解决同样的问题。让我们看看接下来会发生什么?您可以在方便的情况下向我建议代码中可能有问题的点或行提前谢谢:)
-
是的 anirudh,你的减速器本身没有被调用,因为 ramzy 正确指出了不正确的签名。因此,您得到的输出只是映射器输出
标签: hadoop mapreduce mapper reducers