Vi一个tianqi.txt,把最后一的空格删掉。

温度

天气案例:细粒度介绍计算框架

需求:

找出每个月气温最高的2天

1949-10-01 14:21:02 34c

1949-10-01 19:21:02 38c

1949-10-02 14:01:02 36c

1950-01-01 11:21:02 32c

1950-10-01 12:21:02 37c

1951-12-01 12:21:02 23c

1950-10-02 12:21:02 41c

1950-10-03 12:21:02 27c

1951-07-01 12:21:02 45c

1951-07-02 12:21:02 46c

1951-07-03 12:21:03 47c

MapReduce部分:天气案例:细粒度介绍计算框架

思路分析:

 

 1,MR

*保证原语

怎样划分数据,怎样定义一组

2,k:v映射的设计

考虑reduce的计算复杂度

3,能不能多个reduce

倾斜:抽样

集群资源情况

4,自定义数据类型

思路:

每年

每个月

最高

2天

1天多条记录?

进一部思考

年月分组

温度升序

key中要包含时间和温度呀!

MR原语:相同的key分到一组

通过GroupCompartor设置分组规则

自定义数据类型Weather

包含时间

包含温度

自定义排序比较规则

自定义分组比较

年月相同被视为相同的key

那么reduce迭代时,相同年月的记录有可能是同一天的

reduce中需要判断是否同一天

注意OOM

数据量很大

全量数据可以切分成最少按一个月份的数据量进行判断

这种业务场景可以设置多个reduce

通过实现partition

 

 

 

代码实现:

客户端部分:

package com.sxt.hadoop.mr.tq;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MyTQ {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration(true);
Job job=Job.getInstance(conf);

job.setJarByClass(MyTQ.class);
job.setJobName("tq");

//input输入
Path input=new Path("/data/tq/input");
FileInputFormat.addInputPath(job, input);

//output输出
Path output=new Path("/data/tq/output");
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
//conf:::::
//maptask
//inputformat
//job.setInputFormatClass(TextInputFormat.class);
//map
job.setMapperClass(TMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);

//partition  切割成多少块,取模
job.setPartitionerClass(TPartitioner.class);

//comp   排序
job.setSortComparatorClass(TSortComparator.class);

//combiner

//reducetask
//groupingcomparator
job.setGroupingComparatorClass(TGroupComparator.class);

//reduce
job.setReducerClass(TReducer.class);

//submit
job.setNumReduceTasks(2);
job.waitForCompletion(true);
}

}

 

类一:

package com.sxt.hadoop.mr.tq;


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;


public class TMapper extends Mapper<LongWritable,Text,TQ,IntWritable>{
TQ mkey=new TQ();


IntWritable mval=new IntWritable();

@Override
protected void map(LongWritable key,Text value,Context context){
//value::1949-10-01 14:21:02 34c
String[]strs=StringUtils.split(value.toString(),'\t');
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
try {
Date date=sdf.parse(strs[0]);
Calendar cal=Calendar.getInstance();
cal.setTime(date);

mkey.setYear(cal.get(Calendar.YEAR));
mkey.setMonth(cal.get(Calendar.MONTH)+1);
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));

int wd=Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
mkey.setWd(wd);

mval.set(wd);
context.write(mkey, mval);

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

类二:

package com.sxt.hadoop.mr.tq;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.WritableComparable;


public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;


public TQ() {
super();
}


public TQ(int year, int month, int day, int wd) {
super();
this.year = year;
this.month = month;
this.day = day;
this.wd = wd;
}


@Override
public String toString() {
return "TQ [year=" + year + ", month=" + month + ", day=" + day + ", wd=" + wd + "]";
}


public int getYear() {
return year;
}


public void setYear(int year) {
this.year = year;
}


public int getMonth() {
return month;
}


public void setMonth(int month) {
this.month = month;
}


public int getDay() {
return day;
}


public void setDay(int day) {
this.day = day;
}


public int getWd() {
return wd;
}


public void setWd(int wd) {
this.wd = wd;
}


@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();

}


@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);

}


@Override
public int compareTo(TQ that) {
int c1=Integer.compare(this.year, that.getYear());
if(c1==0){
int c2=Integer.compare(this.month, that.getMonth());
if(c2==0){
return Integer.compare(this.day, that.getDay());

}
return c2;
}
return c1;
}

}

类三:

package com.sxt.hadoop.mr.tq;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;


public class TPartitioner extends Partitioner<TQ, IntWritable>{


@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {

return key.getYear() % numPartitions;
}

}

类四:

package com.sxt.hadoop.mr.tq;


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


public class TSortComparator extends WritableComparator{


public TSortComparator(){
super(TQ.class,true);
}

@Override
public int compare(WritableComparable a,WritableComparable b){
TQ t1=(TQ) a;
TQ t2=(TQ) b;

int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
int c2=Integer.compare(t1.getMonth(), t2.getMonth());
if(c2==0){
return Integer.compare(t1.getDay(), t2.getDay());
}
return c2;
}
return c1;

}

}

类五:

package com.sxt.hadoop.mr.tq;


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


public class TGroupComparator extends WritableComparator{


public TGroupComparator(){
super(TQ.class,true);

}

@Override
public int compare(WritableComparable a,WritableComparable b){

TQ t1=(TQ) a;
TQ t2=(TQ) b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
return Integer.compare(t1.getMonth(), t2.getMonth());
}
return c1;
}

}

类六:

package com.sxt.hadoop.mr.tq;


import java.io.IOException;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class TReducer extends Reducer<TQ,IntWritable,Text,IntWritable>{


Text rkey=new Text();
IntWritable rval=new IntWritable();

@Override
protected void reduce(TQ key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
//相同的key为一组,这一组数据调用一次reduce方法
//方法内迭代这一组数据进行计算
//最高气温的两天
int flg=0;
int day=0;

for(IntWritable v: values){
if(flg==0){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey, rval);
day=key.getDay();
flg++;
}
if(flg!=0 && day!=key.getDay()){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey, rval);

break;
}
}
}

}

最后执行:

 

天气后边的执行流程:

首先是导出jar包并且上传到root根目录下

在root根目录下创建tq.txt的文档。并且存入数据

创建目录:hdfs dfs -mkdir -p /data/tq/input

将数据存储到 /data/tq/input下:

Hdfs dfs -put tq.txt  /data/tq/input

开始执行:

Hadoop jar MyTQ.jar  com.sxt.hadoop.mr.tq.MyTQ

相关文章:

  • 2021-09-15
  • 2021-08-30
  • 2021-08-01
  • 2021-08-08
  • 2021-11-15
猜你喜欢
  • 2021-10-27
  • 2021-12-08
  • 2021-07-29
  • 2022-01-17
  • 2022-01-14
  • 2021-09-27
相关资源
相似解决方案