【问题标题】:Hadoop Mapreduce, How do I rewrite a txt file inputted in the mapper with map reduce output?Hadoop Mapreduce,如何使用 map reduce 输出重写在映射器中输入的 txt 文件?
【发布时间】:2018-05-21 01:58:54
【问题描述】:

我正在尝试创建一个 map reduce 程序来执行 k-means 算法。我知道使用 map reduce 并不是执行迭代算法的最佳方法。 我已经创建了映射器和减速器类。 在映射器代码中,我读取了一个输入文件。当 map reduce 完成后,我希望将结果存储在同一个输入文件中。如何让输出文件覆盖映射器输入的文件? 另外,我使映射减少迭代,直到旧输入文件和新输入文件中的值收敛,即值之间的差异小于 0.1

我的代码是:

 import java.io.IOException;
 import java.util.StringTokenizer;
 import java.util.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.FileReader;
 import java.io.BufferedReader;
 import java.util.ArrayList;


public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();

public void setup(Context context) throws IOException{
        BufferedReader br = new BufferedReader(new 
        FileReader(centroidFile));
        String contentLine;
        while((contentLine = br.readLine())!=null){
            centers.add(Double.parseDouble(contentLine));
        }
}

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException {

        String[] fields = input.toString().split("  ");
        Double rating = Double.parseDouble(fields[2]);
        Double distance = centers.get(0) - rating;
        int position = 0;
        for(int i=1; i<centers.size(); i++){
            Double cDistance = Math.abs(centers.get(i) - rating);
            if(cDistance< distance){
                position = i;
                distance = cDistance;
            }
        }
        Double closestCenter = centers.get(position);
        context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value

        }
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> {

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string
          throws IOException, InterruptedException {
            Iterator<DoubleWritable> v = values.iterator();
            double total = 0;
            double count = 0;
            String value = ""; //value is the rating
            while (v.hasNext()){
              double i = v.next().get();
              value = value + " " + Double.toString(i);
              total = total + i;
              ++count;
            }
            double nCenter = total/count;
  context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class run
{

 public static void runJob(String[] input, String output) throws Exception {

    Configuration conf = new Configuration();

  Job job = new Job(conf);
  Path toCache = new Path("input/centroid.txt"); 
  job.addCacheFile(toCache.toUri());
  job.setJarByClass(run.class);
  job.setMapperClass(kmeansMapper.class);
  job.setReducerClass(kmeansReducer.class);
  job.setMapOutputKeyClass(DoubleWritable.class);
  job.setMapOutputValueClass(DoubleWritable.class);

  job.setNumReduceTasks(1);
  Path outputPath = new Path(output);
  FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
  FileOutputFormat.setOutputPath(job, outputPath);
  outputPath.getFileSystem(conf).delete(outputPath,true);
  job.waitForCompletion(true);

}

public static void main(String[] args) throws Exception {
   runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);

}

}

谢谢

【问题讨论】:

    标签: algorithm hadoop mapreduce bigdata k-means


    【解决方案1】:

    我知道您提出了免责声明。但请切换到 Spark 或其他可以解决内存问题的框架。你的生活会好很多。

    如果您真的想这样做,只需在 runJob 中迭代运行代码并使用临时文件名作为输入。您可以看到this question on moving files in hadoop 来实现这一点。您需要一个 FileSystem 实例和一个临时文件作为输入:

    FileSystem fs = FileSystem.get(new Configuration());
    Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input';
    

    一般来说,每次迭代完成后,做

    fs.delete(tempInputPath)
    fs.rename(outputPath, tempInputPath)
    

    当然,对于第一次迭代,您必须将输入路径设置为运行作业时提供的输入路径。后续迭代可以使用 tempInputPath,它将是上一次迭代的输出。

    【讨论】:

    • 您好,感谢您的回复,如何在runjob中迭代代码?
    • 您只需将 runJob 中的代码的必要部分包装在一个正常的 for 循环中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-01-21
    • 2014-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多