【发布时间】:2014-08-21 17:40:22
【问题描述】:
设置和清理方法到底是做什么用的?我试图找出它们的含义,但还没有人准确描述它们的作用。例如,setup 方法如何使用来自输入拆分的数据?它是一个整体吗?还是一行一行?
【问题讨论】:
设置和清理方法到底是做什么用的?我试图找出它们的含义,但还没有人准确描述它们的作用。例如,setup 方法如何使用来自输入拆分的数据?它是一个整体吗?还是一行一行?
【问题讨论】:
如前所述,setup() 和 cleanup() 是您可以覆盖的方法,如果您选择的话,它们可以让您初始化和清理您的 map/reduce 任务。在这些阶段,您实际上无法直接访问来自输入拆分的任何数据。 map/reduce 任务的生命周期是(从程序员的角度来看):
设置 -> 地图 -> 清理
设置 -> 减少 -> 清理
setup() 期间通常发生的情况是,您可以从配置对象中读取参数来自定义处理逻辑。
cleanup() 期间通常发生的情况是您清理了您可能已分配的所有资源。还有其他用途,即清除聚合结果的任何累积。
setup() 和 cleanup() 方法只是让开发人员/程序员有机会在 map/reduce 任务之前和之后做某事的“钩子”。
例如,在规范字数统计示例中,假设您要排除某些字词(例如“the”、“a”、“be”等停用词)。当您配置 MapReduce 作业时,您可以将这些单词的列表(逗号分隔)作为参数(键值对)传递到配置对象中。然后在您的地图代码中,在setup() 期间,您可以获取停用词并将它们存储在某个全局变量(地图任务的全局变量)中,并在地图逻辑期间排除对这些词的计数。这是来自http://wiki.apache.org/hadoop/WordCount 的修改示例。
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Set<String> stopWords;
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
stopWords = new HashSet<String>();
for(String word : conf.get("stop.words").split(",")) {
stopWords.add(word);
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
if(stopWords.contains(token)) {
continue;
}
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("stop.words", "the, a, an, be, but, can");
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
【讨论】:
setup: Called once at the beginning of the task.
您可以在此处放置自定义初始化。
cleanup: Called once at the end of the task.
你可以把资源释放放在这里。
【讨论】:
setup 和 cleanup 为每个任务调用一次。
例如,您有 5 个映射器正在运行,对于每个映射器您要初始化一些值,然后您可以使用 setup.您的 setup 方法被调用了 5 次。
因此,对于每个 mapreduce,首先调用 setup() 方法,然后调用 map()/reduce() 方法,然后在退出任务之前调用 cleanup() 方法。
【讨论】: