【问题标题】:Stuck in the Reduce Join Code卡在减少加入代码中
【发布时间】:2016-01-29 11:00:00
【问题描述】:

我有两个数据集。两者都在下面给出
第一个数据集

1   A
2   B
3   C
4   D
5   E 


第二个数据集

1   ALPHA
2   BRAVO
3   CHARLIE
4   DELTA
5   ECHO


我想使用 reduce side join 加入这个数据集
最终数据应该是这样的


A   ALPHA
B   BRAVO
C   CHARLIE
D   DELTA
E   ECHO


我编写了以下代码
Mapper(从第一个数据集中提取数据)

public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
    private String tokens[];
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split("\t");
        context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
    }
}

Mapper(从第二个数据集中提取数据)

public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
        private String tokens[];
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
        }
}

Reducer(根据需要加入数据)

public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
    private String output1=new String();
    private String output2=new String();
    private TreeMap<String,String> x1=new TreeMap<String,String>();
    private String tokens[];
    public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            if(tokens[0].contains("m1"))
            {
                output1=tokens[1];
            }else if(tokens[0].contains("m2"))
            {
                output2=(tokens[1]);
            }
            x1.put(output2, output1);
        cleanup(context);

}
    public void cleanup(Context context)throws IOException,InterruptedException{

        for(Entry y:x1.entrySet())
        {
            context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
        }
    }
}

在驱动程序类中包含以下行

MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);


我得到了如下所示的输出,这根本不是我们想要的。

1       m1      A
1       m2      ALPHA
2       m2      BRAVO
2       m1      B
3       m1      C
3       m2      CHARLIE
4       m2      DELTA
4       m1      D
5       m1      E
5       m2      ECHO

尽管我没有将索引包含在 context.write() 中,但我绝对无法弄清楚为什么要打印索引
我什至使用了 cleanup(),仍然得到相同的结果。 请建议如何获得所需的结果,如上所示。

最衷心感谢让我摆脱困境的人:)

经过几次修改后,我得到了这个输出

m1      E
m1      D
m1      C
m1      B
m1      A
m2      ECHO
m2      DELTA
m2      CHARLIE
m2      BRAVO
m2      ALPHA

【问题讨论】:

  • 认为这应该是您的映射器输出
  • 不,实际上这不是减速器的输出。问题是,我无法将减速器发出的值对齐到一行中。后来我得到了一些输出,比如我要更新到问题区域的输出。
  • 但是 m1 或 m2 不是你的树形图的一部分,对吧?
  • 对吗?他们仍然进入输出。我试图通过进一步的分析来解决同样的问题。让我们看看接下来会发生什么?您可以在方便的情况下向我建议代码中可能有问题的点或行提前谢谢:)
  • 是的 anirudh,你的减速器本身没有被调用,因为 ramzy 正确指出了不正确的签名。因此,您得到的输出只是映射器输出

标签: hadoop mapreduce mapper reducers


【解决方案1】:

reducer 方法应该有 key 和 Iterable 值作为参数。每个reducer都会有以下格式的数据

{1, {"m1 A","m2 ALPHA"}}, {1, {"m2 BA","m2 BRAVO"}}。

请重新检查reducer方法的签名。我假设一旦解决了这个问题,并且如果您的数据是一对一的,您就可以进行相应的映射。如果是一对多,您可能有多个 m1 或 m2,为此,您需要决定如何管理多个值(映射保持为逗号分隔或 json 或 xml 字符串),然后输出最终值。

【讨论】:

  • 感谢拉姆齐的帮助
【解决方案2】:

修改后的代码可能如下

 public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{

               for(Text value : values) {
                tokens=values.toString().split("\t");
                if(tokens[0].contains("m1"))
                {
                    output1=tokens[1];
                }else if(tokens[0].contains("m2"))
                {
                    output2=(tokens[1]);
                }
                x1.put(output2, output1);
               }
            cleanup(context);

    }

【讨论】:

  • 谢谢你的帮助 madhu :)
猜你喜欢
  • 2017-01-25
  • 1970-01-01
  • 1970-01-01
  • 2017-12-30
  • 1970-01-01
  • 2017-10-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多