背景

在Hadoop的整个框架中,设计了Combine-Partition结构。其目的是减少数据通信同步的开销。

但实际上,Hadoop的Combiner和Partitioner在Shuffle和Sort之后执行。且C/P何时被调用,调用几次都是不确定的。

这就给编写高效率的Hadoop程序提出了挑战。有没有解决办法呢?答案是肯定的。

分析算法

Hadoop提供一系列机制来保存Mapper和Reducer的状态。如下图,一个Mapper实例(Reducer实例同理)的状态可以用一个State变量保存。Configure阶段可以看成C++里面的构造函数,Close阶段可以看成C++的析构函数。其中State变量应该在Configure阶段声明。(如果State变量在Map阶段声明,那么在实际运行过程中就会大量创建新对象,且这种State并不能跨运算行。)

 MapReduce设计模式之In-mapper Combining

算法实现

org.apache.hadoop.mapreduce.Mapper类的setup方法实现了Configure的功能,cleanup方法则实现了Close的功能。我们重写即可。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private HMapKI<String> map = null; @Override protected void setup(Context context) throws IOException, InterruptedException { map = new HMapKI<String>(); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { InputStream is = new ByteArrayInputStream(value.toString().getBytes()); NxParser parser = new NxParser(is); if (parser.hasNext()) { StringTokenizer itr = new StringTokenizer(nodes[2].toString()); while (itr.hasMoreTokens()) { String t = itr.nextToken(); map.increment(t); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (String word : map.keySet()) { context.write(new Text(word), new IntWritable(map.get(word))); } } }

解释一下其中的HMapKI类,来自于Jimmy Lin写的cloud9,http://www.umiacs.umd.edu/~jimmylin/projects/index.html 。这个包封装了很多mapreduce常用的数据结构,很好用。另外,强烈推荐Jimmy Lin写的<Data-Intensive Text Processing>这本书,本算法也来自于这本书。

相关文章:

  • 2021-10-22
  • 2022-01-05
  • 2021-11-10
  • 2021-05-28
  • 2022-03-07
  • 2021-10-30
  • 2022-01-24
  • 2022-12-23
猜你喜欢
  • 2021-07-25
  • 2021-06-26
  • 2022-01-11
  • 2021-05-26
  • 2022-12-23
  • 2021-09-28
  • 2021-09-30
相关资源
相似解决方案