【问题标题】:Java Hadoop: How can I create mappers that take as input files and give an output which is the number of lines in each file?Java Hadoop:如何创建以输入文件为输入文件并给出每个文件中行数的输出的映射器?
【发布时间】:2012-05-09 04:34:52
【问题描述】:

我是 Hadoop 新手,我已经设法运行 wordCount 示例:http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html

假设我们有一个包含 3 个文件的文件夹。我希望每个文件都有一个映射器,这个映射器只会计算行数并将其返回给reducer。

reducer 然后将每个映射器的行数作为输入,并将所有 3 个文件中存在的总行数作为输出。

所以如果我们有以下 3 个文件

input1.txt
input2.txt
input3.txt

映射器返回:

mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]

reducer 会输出

3+4+9 = 16 

我已经在一个简单的 Java 应用程序中完成了这项工作,所以我想在 Hadoop 中完成这项工作。我只有 1 台计算机,想尝试在伪分布式环境中运行。

我怎样才能做到这一点?我应该采取哪些正确的步骤?

我的代码应该像 apache 示例中的那样吗?我将有两个静态类,一个用于映射器,一个用于减速器?还是我应该有 3 个类,每个映射器一个?

如果你能指导我完成这个,我不知道该怎么做,我相信如果我设法编写一些代码来做这些事情,那么我将来能够编写更复杂的应用程序。

谢谢!

【问题讨论】:

    标签: java hadoop mapreduce distributed


    【解决方案1】:

    除了 sa125 的答案,您可以通过不为每个输入记录发出记录来极大地提高性能,而只是在映射器中累积一个计数器,然后在映射器清理方法中,发出文件名和计数值:

    public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        protected long lines = 0;
    
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            String filename = split.getPath().toString();
    
            context.write(new Text(filename), new LongWritable(lines));
        }
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            lines++;
        }
    }
    

    【讨论】:

      【解决方案2】:

      我注意到您使用的是 0.18 版本的文档。 Here's a link to 1.0.2(最新)。

      第一个建议 - 使用 IDE(eclipse、IDEA 等)。这真的有助于填补空白。

      在实际的 HDFS 中,您无法知道每个文件的位置(不同的机器和集群)。无法保证 X 行甚至与 Y 行驻留在同一个磁盘上。也不能保证 X 行不会被拆分到不同的机器上(HDFS 以块的形式分布数据,通常每个 64Mb)。 这意味着您不能假设同一个映射器会处理整个文件。你可以确定的是每个文件都由同一个reducer处理

      由于从映射器发送的每个键的减速器都是唯一的,所以我要这样做的方法是使用文件名作为映射器中的输出键。此外,映射器的默认输入类是TextInputFormat,这意味着每个映射器将单独接收一整行(以 LF 或 CR 终止)。然后,您可以从映射器发出文件名和数字 1(或其他与计算无关的)。然后,在 reducer 中,您只需使用循环来计算文件名被接收的次数:

      在映射器的地图函数中

      public static class Map extends Mapper<IntWritable, Text, Text, Text> {
      
        public void map(IntWritable key, Text value, Context context) {
          // get the filename
          InputSplit split = context.getInputSplit();
          String fileName = split.getPath().getName();
      
          // send the filename to the reducer, the value
          // has no meaning (I just put "1" to have something)
          context.write( new Text(fileName), new Text("1") );
        }
      
      }
      

      在reducer的reduce函数中

      public static class Reduce extends Reducer<Text, Text, Text, Text> {
      
        public void reduce(Text fileName, Iterator<Text> values, Context context) {
          long rowcount = 0;
      
          // values get one entry for each row, so the actual value doesn't matter
          // (you can also get the size, I'm just lazy here)
          for (Text val : values) {
            rowCount += 1;
          }
      
          // fileName is the Text key received (no need to create a new object)
          context.write( fileName, new Text( String.valueOf( rowCount ) ) );
        }
      
      }
      

      在驱动程序/主程序中

      您几乎可以使用与 wordcount 示例相同的驱动程序 - 请注意,我使用了新的 mapreduce API,因此您需要调整一些东西(Job 而不是 JobConf 等)。 This was really helpful 当我读到它时。

      请注意,您的 MR 输出将只是每个文件名及其行数:

      input1.txt    3
      input2.txt    4
      input3.txt    9
      

      如果您只想计算所有文件中的总行数,只需在所有映射器中发出相同的键(而不是文件名)。这样就只有一个 reducer 来处理所有的行计数:

      // no need for filename
      context.write( new Text("blah"), new Text("1") );
      

      您还可以链接一个作业来处理每个文件的行数的输出,或者做其他花哨的事情 - 这取决于您。

      我遗漏了一些样板代码,但基本都在那里。一定要检查我,因为我是从记忆中输入的大部分内容.. :)

      希望这会有所帮助!

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-05-14
        相关资源
        最近更新 更多