【问题标题】:The mapreduce code here produces an empty output file. The code and the input is given below此处的 mapreduce 代码生成一个空的输出文件。代码和输入如下
【发布时间】:2017-01-23 11:48:52
【问题描述】:

这里的 mapreduce 代码生成一个空的输出文件。代码和输入如下。

package temperature;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TemperatureMapper extends Mapper<Text, Text, Text, IntWritable> {

  @Override
  public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    if (isValueValid(value.toString())) {
      Text key2 = new Text(getStateFromValue(value.toString()));
      IntWritable value2 = new IntWritable(getTemperatureFrom(value.toString()));
      context.write(key2, value2);
    }
  }

  private boolean isValueValid(final String value) {
    // We expect that the value is a String in the form of : State, Temperature. E.g. MP,77
    Pattern p = Pattern.compile("\\S\\S\\,\\d+");
    Matcher m = p.matcher(value);
    return m.matches();
  }

  private String getStateFromValue(final String value) {
    final String[] subvalues = value.split("\\,");
    return subvalues[0];
  }

  private int getTemperatureFrom(final String value) {
    final String[] subvalues = value.split("\\,");
    return Integer.parseInt(subvalues[1]);
  }
}

    public class TemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  protected void reduce(final Text key, final Iterable<IntWritable> values, final Context context) throws IOException, InterruptedException {
    int sumOfTemperatures = 0;
    int nbValues = 0;
    int average=0;
    for (IntWritable temperature : values) {
      sumOfTemperatures += temperature.get();
      nbValues++;
    }
    average = sumOfTemperatures / nbValues;
    context.write(key, new IntWritable(average));
  }
}
public class average {

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    /*if (otherArgs.length != 2) {
      System.err.println("Usage: Main <in> <out>");
      System.exit(-1);
    }*/
    Job job = new Job(conf, "Calculate average Temperature");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

    job.setJarByClass(average.class);

    job.setMapperClass(TemperatureMapper.class);
    job.setReducerClass(TemperatureReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true) ? 0 : -1);
  }

}

代码对输入工作正常:

Ujjain  MP,77
Bhopal  MP,76
Indore  MP,72
Raipur  CG,72
Durg    CG,75
Raigarth        CG,70
Kendujhar   OR,69
Bhubaneswar OR,71
Puri    OR,76

但不适用于一些随机输入,例如:

hello  VI,6
bye RE,2

它会产生一个空的输出文件。

【问题讨论】:

  • 您能否在 map 方法中的 if 条件之前共享“value.toString()”的输出?或者您是否看到 jobtracker 显示您的地图已收到 2 条记录作为输入?

标签: hadoop mapreduce cloudera


【解决方案1】:

修改您的正则表达式以支持这种输入

    Pattern p = Pattern.compile("[a-zA-Z]*\\s*[a-zA-Z]{2},\\d+$");

另外,您需要再次拆分才能获得状态

String[] subvalues = value.split("\\,")[0].split(" ");
return subvalues[subvalues.length - 1];

我希望它有所帮助。在我这边,我不得不更改值 LongWritable 中的键类型,我不确定为什么我们这边没有抱怨,可能是不同的 api 版本

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-03
    • 1970-01-01
    • 2022-11-09
    • 2013-11-05
    相关资源
    最近更新 更多