【发布时间】:2014-10-25 13:00:03
【问题描述】:
在 hadoop 中,我正在编写我的自定义数据类型,如下所示
导入 java.io.DataInput;
导入 java.io.DataOutput;
导入 java.io.IOException;
导入 org.apache.hadoop.io.WritableComparable;
公共类电影实现 WritableComparable {
字符串电影ID;
字符串电影标题;
公共电影(字符串电影ID,字符串电影标题){
极好的();
this.movieId = 电影ID;
this.movieTitle = 电影标题;
}
公共电影(){
}
公共字符串 getMovieId() {
返回电影ID;
}
公共无效 setMovieId(字符串电影 ID){
this.movieId = 电影ID;
}
公共字符串 getMovieTitle() {
返回电影标题;
}
公共无效 setMovieTitle(字符串电影标题){
this.movieTitle = 电影标题;
}
@覆盖
public void readFields(DataInput in) 抛出 IOException {
movieId = in.readLine();
电影标题=in.readLine();
}
@覆盖
公共无效写入(数据输出)抛出 IOException {
// TODO 自动生成的方法存根
out.writeChars(movieId);
out.writeChars(movieTitle);
}
@覆盖
公共 int compareTo(电影 o){
返回movieTitle.compareTo(o.movieTitle);
}
@覆盖
公共 int hashCode(){
返回movieId.hashCode();
}
@覆盖
公共布尔等于(对象o){
电影 m=(电影)o;
返回movieId.equals(m.movieId);
}
@覆盖
公共字符串 toString(){
返回电影标题;
}
}
下面是我的映射器代码
导入 java.io.BufferedReader; 导入 java.io.FileReader; 导入 java.io.IOException; 导入 java.net.URI; 导入 java.util.ArrayList; 导入 java.util.Arrays; 导入 java.util.HashMap; 导入 java.util.List; 导入 java.util.Map; 导入 org.apache.hadoop.filecache.DistributedCache; 导入 org.apache.hadoop.io.LongWritable; 导入 org.apache.hadoop.io.Text; 导入 org.apache.hadoop.mapreduce.Mapper; 公共类 MovieMapper 扩展 Mapper { 地图>movieMap=new HashMap>(); @覆盖 public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{ String[] columns=value.toString().split("::"); 如果(列。长度!= 4){ System.out.println("长度不等于4"); 返回; } 如果(movieMap.containsKey(列[1])){ 列表 mList=movieMap.get(columns[1]); // 设置电影 //System.out.println("在mapper中,movieId="+mList.get(0)+", name="+mList.get(1)); 电影movie=新电影(mList.get(0),mList.get(1)); //movie.setMovieId(mList.get(0)); //movie.setMovieTitle(mList.get(1)); // 设置 MovieRating MovieRating mr=new MovieRating(); mr.setUserId(列[0]); mr.setRating(Integer.parseInt(columns[2])); mr.setTime(列[3]); ctx.write(movie,mr); } } @覆盖 受保护的无效设置(上下文 ctx)抛出 IOException { 加载电影数据(ctx); } 公共 void loadMovieData(Context ctx) 抛出 IOException{ URI[] cacheFiles = DistributedCache.getCacheFiles(ctx.getConfiguration()); System.out.println("inloadMovieData"); if(cacheFiles!=null && cacheFiles.length>0){ System.out.println("缓存文件长度大于 0"); for(URI 路径:cacheFiles){ System.out.println("缓存文件="+path.toString()); BufferedReader 阅读器=null; 尝试{ reader=new BufferedReader(new FileReader(path.toString())); 字符串线; while((line=reader.readLine())!=null){ String[] 列 = line.split("::"); movieMap.put(columns[0], new ArrayList(Arrays.asList(columns))); } }catch(异常 e){ e.printStackTrace(); } 最后{ reader.close(); } } } } }在映射器类中,当控制到达 ctx.write(movie,mr) 时,它会显示溢出失败的问题。我的减速器将输入键作为 Movie,将值作为 MovieRating。
【问题讨论】: