【发布时间】:2014-11-24 11:05:06
【问题描述】:
我创建了一个 mapreduce 程序来获取世界指标数据来显示我想要分析的特定指标的结果。 (即二氧化碳排放)。数据排成一排,包括国家、代码、指标、第 1 年排放量、第 2 年排放量等。在我的映射器中,我试图只保留我想要的数据(首先只保留这条线,如果有特定指标),然后保留国家和所有排放水平(在字符串数组中)。
我的整个程序运行,但我注意到它正在接收 Map 输入记录,但没有 Map 输出记录或 Reduce 输入/输出记录。
我一直想弄清楚我的逻辑哪里出了问题,但我很难过。任何意见表示赞赏。
我的代码如下:
---映射器--
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CO2Mapper extends Mapper <LongWritable, Text, Text, IntWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String delims = ",";
String splitString = value.toString();
String[] tokens = splitString.split(delims);
int tokenCount = tokens.length;
String country = tokens[1];
String indicator = tokens[3];
int levels;
if(indicator.equals("EN.ATM.CO2E.KT"))
{
for (int j = 4; j < tokenCount; j++)
{
levels = Integer.parseInt(tokens[j]);
context.write(new Text(country), new IntWritable(levels));
}
}
}
}
----减速器---
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CO2Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for(IntWritable val : values)
{
maxValue = Math.max(maxValue, val.get());
minValue = Math.min(minValue, val.get());
}
context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(minValue));
}
}
---主要---
package org.myorg;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
public class CO2Levels
{
public static void main(String[] args) throws Exception
{
//with mapreduce
Configuration conf = new Configuration();
Job job = new Job(conf, "co2Levels");
//Job job = new Job();
job.setJarByClass(CO2Levels.class);
//job.setJobName("co2Levels");
job.setMapperClass(CO2Mapper.class);
job.setReducerClass(CO2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
【问题讨论】:
-
您是否尝试过使用调试器单步调试代码并观察其对一组输入的行为?
-
你能提供一些示例输入文本吗?
-
你的程序很好。我认为问题在于输入。可能文件的第 4 列不包含所需的值,因此不符合
if(indicator.equals("EN.ATM.CO2E.KT"))条件。 -
@voidHead 我尝试添加调试器步骤以查看它在我的映射器中停止的位置,但我在编写代码以便能够在我的输出中看到它时遇到问题(即在 if stmt @987654325 @. stmts 没有出现在我的终端中,我仍然没有输出文件。我很难过。
-
@blackSmith 我将文件保存为 .csv 并将其转换为 .txt 下面是示例行,除非您希望我上传更大的示例集。
标签: java apache hadoop mapreduce mapper