一、数据倾斜分析——mapJoin

  1.背景

    接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:

--order
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
--product
P0001,小米5,1000,2
P0002,锤子T1,1000,3

    例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002

的锤子的reduce确很轻松,这样,就产生了数据倾斜了!

    更多的数据倾斜的介绍,参考http://blog.csdn.net/u010039929/article/details/55044407

    我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里

mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考http://blog.csdn.net/lzm1340458776/article/details/42971075

    相关的分布式缓存的用法,参考http://blog.csdn.net/qq1010885678/article/details/50751007

    当然,首先应当查看的,应该是官方文档的介绍:点击查看

  2.代码

package com.mr.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

/**
 * mapper
 *
 * @author zcc ON 2018/2/5
 **/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Map<String, String> infoMap = new HashMap<>();
    Text k = new Text();
    /**
     * 启动之前进行一些必要的初始化工作
     * @param context 上下文
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
        String path = context.getCacheFiles()[0].getPath();
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
        String line;
        while (StringUtils.isNotEmpty(line = br.readLine())) {
            String[] fields = line.split(",");
            // 将字典加载进入map
            infoMap.put(fields[0], fields[1]);
        }
        // 关闭流
        br.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String orderLine = value.toString();
        // 切分订单信息
        String[] fields = orderLine.split("\t");
        String pName = infoMap.get(fields[1]);
        k.set(orderLine + "\t" + pName);
        // 写出去
        context.write(k, NullWritable.get());
    }
}
MapJoinMapper

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-09-13
  • 2022-12-23
  • 2022-12-23
  • 2021-12-01
  • 2022-12-23
  • 2021-11-29
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-04-10
  • 2021-10-30
  • 2021-11-07
相关资源
相似解决方案