【问题标题】:MapReduce output ordering in Java8Java8 中的 MapReduce 输出排序
【发布时间】:2018-04-08 23:12:36
【问题描述】:

我尝试使用解决方案在Hadoop 中对我的reducer 的输出进行排序,如本问题所述:

MapReduce sort by value in descending order

这个和Java8有些冲突,我解决如下:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.LinkedHashMap;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;

public class HourlyTweetsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    public Map<String , Integer> map = new LinkedHashMap<String , Integer>();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        map.put(key.toString() , sum);

        result.set(sum);
        context.write(key, result);
    }

    public void cleanup(Context context){
        //Cleanup is called once at the end to finish off anything for reducer
        //Here we will write our final output
        Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
        sortedMap = sortMap(map);

        for (Map.Entry<String,Integer> entry : sortedMap.entrySet()){
            context.write(new Text(entry.getKey()),new IntWritable(entry.getValue()));
        }
    }

    public Map<String , Integer > sortMap (Map<String,Integer> unsortMap){

        Map<String ,Integer> hashmap = new HashMap<String,Integer>();
        int count=0;
        List<Map.Entry<String,Integer>> list = new LinkedList<Map.Entry<String,Integer>>(unsortMap.entrySet());
        //Sorting the list we created from unsorted Map
        Collections.sort(list , new Comparator<Map.Entry<String,Integer>>(){
            public int compare (Map.Entry<String , Integer> o1 , Map.Entry<String , Integer> o2 ){
                //sorting in descending order
                return o2.getValue().compareTo(o1.getValue());
            }
        });

        for(Map.Entry<String, Integer> entry : list){
            // only writing top 3 in the sorted map
            // if(count>2)
            // break;
            hashmap.put(entry.getKey(),entry.getValue());
        }

        return hashmap ;
    }

}

问题是运行作业后输出没有排序:

11  1041557
14  1304166
17  1434978
2   733462
20  1288767
23  1677571
5   460629
8   497403
11  1041557
23  1677571
2   733462
14  1304166
5   460629
17  1434978
8   497403
20  1288767

我们如何解决它?

【问题讨论】:

  • HourlyTweets.java 使用或覆盖已弃用的 API 这是您使用的库还是您自己的程序?
  • 这是我自己基于Hadoop的程序
  • 它只是在错误消息中声明:unreported exception IOException; must be caught or declared to be thrown in HourlyTweetsReducer.java:45
  • 只需在声明的45 行上使用漂亮的try
  • 你是用记事本开发的吗?一个体面的 IDE 甚至可能会为您提供如何操作的选项。

标签: java hadoop java-8 reducers


【解决方案1】:

我不打算判断这段代码是否需要额外的步骤来确保在 Hadoop 的 Map/Reduce 上下文中的正确性。

但一个明显的错误是sortMap的开头有一行

Map<String ,Integer> hashmap = new HashMap<String,Integer>();

它创建了一个不维护任何特定顺序的地图,因此按排序顺序填充它没有效果。它应该是LinkedHashMap,就像链接问答的代码一样。

请注意,这与调用者创建的地图无关:

Map<String , Integer>  sortedMap = new HashMap<String , Integer>();
sortedMap = sortMap(map);

在这里,对创建的地图的引用被sortMap 的结果覆盖,因此地图实例完全过时了。但是,由于您要做的只是在已排序的地图上迭代一次以执行一项操作,因此您根本不需要将已排序的列表复制到结果 Map,因为您可以通过迭代执行该操作名单:

public void cleanup(Context context) {
    //Cleanup is called once at the end to finish off anything for reducer
    //Here we will write our final output

    List<Map.Entry<String,Integer>> list = new ArrayList<>(map.entrySet());

    Collections.sort(list, Map.Entry.comparingByValue(Comparator.reverseOrder()));

    for(Map.Entry<String,Integer> entry: list) {
        context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
    }
}

这使用了 Java 8 的 Map.Entry.comparingByValue(Comparator.reverseOrder()) 内置比较器。如果需要 Java 7 兼容性,请使用问题中显示的比较器代码,

new Comparator<Map.Entry<String, Integer>>() {
    public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
        //sorting in descending order
        return o2.getValue().compareTo(o1.getValue());
    }
}

请注意,此代码使用ArrayList 而不是LinkedList,因为您将使用它执行所有三个操作,1) 使用地图条目集的内容对其进行初始化,2) 对其进行就地排序和3) 迭代它,使用ArrayList 工作得更快。对于 Java 8 中的步骤 2) 尤其如此。

【讨论】:

    【解决方案2】:

    我在这里看到两个选项:

    1. 只需使用单个减速器。这要求所有数据都可以放入单台机器的内存中。然后,单个reducer的输入将按照key的顺序(你想要的)排序。
    2. 使用 TotalOrderPartitioner https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.html

    它在 MapReduce 管道中强制执行一个附加阶段,以将元素划分为已排序的存储桶。

    这是一个展示如何使用 TotalOrderPartioner 的示例(不是我的):https://gist.github.com/asimjalis/e5627dc2ff2b23dac70b

    【讨论】:

    • 只是为了解释选项 1。我看到的问题不是代码,而是运行的减速器的数量。如果多个reducer运行,那么它们会将您的数据分布在多个桶中,并且会对每个桶的数据进行排序并稍后合并它们,从而导致问题
    • 谢谢,它是主作业文件的一部分,而不是映射器或减速器,对吧?
    • 好吧,在你定义reducer的数量的地方做,如果你没有定义它们,那么数量将根据数据的大小来决定。在主作业文件中定义它们是一个很好的做法,但要确保数据可以适合单机的内存。如果不使用选项 2,它肯定会起作用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-31
    相关资源
    最近更新 更多