一、数据倾斜分析——mapJoin
1.背景
接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:
--order
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
--product
P0001,小米5,1000,2
P0002,锤子T1,1000,3
例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002
的锤子的reduce确很轻松,这样,就产生了数据倾斜了!
更多的数据倾斜的介绍,参考:http://blog.csdn.net/u010039929/article/details/55044407
我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里
mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考:http://blog.csdn.net/lzm1340458776/article/details/42971075
相关的分布式缓存的用法,参考:http://blog.csdn.net/qq1010885678/article/details/50751007
当然,首先应当查看的,应该是官方文档的介绍:点击查看
2.代码
package com.mr.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; /** * mapper * * @author zcc ON 2018/2/5 **/ public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Map<String, String> infoMap = new HashMap<>(); Text k = new Text(); /** * 启动之前进行一些必要的初始化工作 * @param context 上下文 * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { // BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"))); String path = context.getCacheFiles()[0].getPath(); BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path))); String line; while (StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split(","); // 将字典加载进入map infoMap.put(fields[0], fields[1]); } // 关闭流 br.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); // 切分订单信息 String[] fields = orderLine.split("\t"); String pName = infoMap.get(fields[1]); k.set(orderLine + "\t" + pName); // 写出去 context.write(k, NullWritable.get()); } }