Vi一个tianqi.txt,把最后一的空格删掉。
温度
天气案例:细粒度介绍计算框架
需求:
找出每个月气温最高的2天
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
思路分析:
1,MR
*保证原语
怎样划分数据,怎样定义一组
2,k:v映射的设计
考虑reduce的计算复杂度
3,能不能多个reduce
倾斜:抽样
集群资源情况
4,自定义数据类型
思路:
每年
每个月
最高
2天
1天多条记录?
进一部思考
年月分组
温度升序
key中要包含时间和温度呀!
MR原语:相同的key分到一组
通过GroupCompartor设置分组规则
自定义数据类型Weather
包含时间
包含温度
自定义排序比较规则
自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的
reduce中需要判断是否同一天
注意OOM
数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition
代码实现:
客户端部分:
package com.sxt.hadoop.mr.tq;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyTQ {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration(true);
Job job=Job.getInstance(conf);
job.setJarByClass(MyTQ.class);
job.setJobName("tq");
//input输入
Path input=new Path("/data/tq/input");
FileInputFormat.addInputPath(job, input);
//output输出
Path output=new Path("/data/tq/output");
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
//conf:::::
//maptask
//inputformat
//job.setInputFormatClass(TextInputFormat.class);
//map
job.setMapperClass(TMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
//partition 切割成多少块,取模
job.setPartitionerClass(TPartitioner.class);
//comp 排序
job.setSortComparatorClass(TSortComparator.class);
//combiner
//reducetask
//groupingcomparator
job.setGroupingComparatorClass(TGroupComparator.class);
//reduce
job.setReducerClass(TReducer.class);
//submit
job.setNumReduceTasks(2);
job.waitForCompletion(true);
}
}
类一:
package com.sxt.hadoop.mr.tq;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
public class TMapper extends Mapper<LongWritable,Text,TQ,IntWritable>{
TQ mkey=new TQ();
IntWritable mval=new IntWritable();
@Override
protected void map(LongWritable key,Text value,Context context){
//value::1949-10-01 14:21:02 34c
String[]strs=StringUtils.split(value.toString(),'\t');
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
try {
Date date=sdf.parse(strs[0]);
Calendar cal=Calendar.getInstance();
cal.setTime(date);
mkey.setYear(cal.get(Calendar.YEAR));
mkey.setMonth(cal.get(Calendar.MONTH)+1);
mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
int wd=Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
mkey.setWd(wd);
mval.set(wd);
context.write(mkey, mval);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
类二:
package com.sxt.hadoop.mr.tq;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;
public TQ() {
super();
}
public TQ(int year, int month, int day, int wd) {
super();
this.year = year;
this.month = month;
this.day = day;
this.wd = wd;
}
@Override
public String toString() {
return "TQ [year=" + year + ", month=" + month + ", day=" + day + ", wd=" + wd + "]";
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.month=in.readInt();
this.day=in.readInt();
this.wd=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public int compareTo(TQ that) {
int c1=Integer.compare(this.year, that.getYear());
if(c1==0){
int c2=Integer.compare(this.month, that.getMonth());
if(c2==0){
return Integer.compare(this.day, that.getDay());
}
return c2;
}
return c1;
}
}
类三:
package com.sxt.hadoop.mr.tq;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class TPartitioner extends Partitioner<TQ, IntWritable>{
@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {
return key.getYear() % numPartitions;
}
}
类四:
package com.sxt.hadoop.mr.tq;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TSortComparator extends WritableComparator{
public TSortComparator(){
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a,WritableComparable b){
TQ t1=(TQ) a;
TQ t2=(TQ) b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
int c2=Integer.compare(t1.getMonth(), t2.getMonth());
if(c2==0){
return Integer.compare(t1.getDay(), t2.getDay());
}
return c2;
}
return c1;
}
}
类五:
package com.sxt.hadoop.mr.tq;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TGroupComparator extends WritableComparator{
public TGroupComparator(){
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a,WritableComparable b){
TQ t1=(TQ) a;
TQ t2=(TQ) b;
int c1=Integer.compare(t1.getYear(), t2.getYear());
if(c1==0){
return Integer.compare(t1.getMonth(), t2.getMonth());
}
return c1;
}
}
类六:
package com.sxt.hadoop.mr.tq;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TReducer extends Reducer<TQ,IntWritable,Text,IntWritable>{
Text rkey=new Text();
IntWritable rval=new IntWritable();
@Override
protected void reduce(TQ key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
//相同的key为一组,这一组数据调用一次reduce方法
//方法内迭代这一组数据进行计算
//最高气温的两天
int flg=0;
int day=0;
for(IntWritable v: values){
if(flg==0){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey, rval);
day=key.getDay();
flg++;
}
if(flg!=0 && day!=key.getDay()){
rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
rval.set(key.getWd());
context.write(rkey, rval);
break;
}
}
}
}
最后执行:
天气后边的执行流程:
首先是导出jar包并且上传到root根目录下
在root根目录下创建tq.txt的文档。并且存入数据
创建目录:hdfs dfs -mkdir -p /data/tq/input
将数据存储到 /data/tq/input下:
Hdfs dfs -put tq.txt /data/tq/input
开始执行:
Hadoop jar MyTQ.jar com.sxt.hadoop.mr.tq.MyTQ