【问题标题】:bloom filter in reduce side join减少侧连接中的布隆过滤器
【发布时间】:2014-03-07 10:20:37
【问题描述】:

我目前正在探索布隆过滤器。我浏览了大部分关于bloom fitlers的博客,知道什么是,但仍然无法找出一个关于万一连接的例子。

每篇文章都说它会减少网络 I/O,但都没有说明如何?特别是 http://vanjakom.wordpress.com/tag/distributed-cache/ 很好,但它看起来和我刚开始使用 map reduce 一样复杂。

谁能帮我在下面的例子中实现布隆过滤器(reduceside join)

2个mapers读取用户记录和部门记录,reducer加入

用户记录

身份证,姓名

3738,里奇·戈尔

12946,罗尼·萨姆

17556,大卫·加特

3443,雷切尔·史密斯

5799,保罗·罗斯塔

部门记录

3738,销售

12946,市场营销

17556,市场营销

3738,销售

3738,销售

代码

public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{

 private Text outkey = new Text();
 private Text outval = new Text();
 private String id, name;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
             throws IOException {

     String line = value.toString();
     String arryUsers[] = line.split(",");
     id = arryUsers[0].trim();
     name = arryUsers[1].trim();

     outkey.set(id);
     outval.set("A"+ name);
     ouput.collect(outkey, outval);
   }
    }

public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

    String line = value.toString();
    String arryDept[] = line.split(",");
    depid = arryDept[0].trim();
    dep = arryDept[1].trim();

    Outk.set(depid);
    Outv.set("B" + dep);

    output.collect(Outk, Outv);
}
    }

和减速器

ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();

public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {

    listA.clear();
    listB.clear();

    while (values.hasNext()) {

        tmp = values.next();
        if (tmp.charAt(0) == 'A') {
            listA.add(new Text(tmp.toString().substring(1)));
        } else if (tmp.charAt(0) == 'B') {
            listB.add(new Text(tmp.toString().substring(1)));
        }



    }
    executejoinlogic(output);

}

private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {

    if (!listA.isEmpty() && !listB.isEmpty()) {
        for (Text A : listA) {
        for (Text B : listB) {
        output.collect(A, B);
        }
        }
         }
    }
          }

上述场景可以实现布隆过滤器吗?

如果是,那么请帮我实现这个?

【问题讨论】:

    标签: java hadoop mapreduce bloom-filter


    【解决方案1】:

    只有当您的两个输入表中的一个比另一个小得多时,您才能在此处实施布隆过滤器。您需要在此处遵循的过程是:

    1. 在 Mapper 类的setup() 方法中初始化一个布隆过滤器(过滤器对象本身应该是全局的,以便以后可以通过map() 方法访问):

      filter = new BloomFilter(VECTOR_SIZE,NB_HASH,HASH_TYPE);

    2. 将较小的表读入Mapper的setup()方法中。

    3. 将每条记录的 ID 添加到布隆过滤器:

      filter.add(ID);

    4. map() 方法本身中,对来自较大输入源的任何 ID 使用 filter.membershipTest(ID)。如果没有匹配,您就知道该 ID 不存在于您的较小数据集中,因此不应将其传递给 reducer。

    5. 请记住,您会在 reducer 中得到误报,因此不要假设所有内容都会被加入。

    【讨论】:

    • 请原谅我再次提出一个老问题,但我相信 Ben 的 #5 是不正确的:我认为 Bloom Filter 不可能产生假阴性?假阳性,是的,但不是假阴性
    猜你喜欢
    • 2015-10-25
    • 1970-01-01
    • 2011-09-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-29
    • 1970-01-01
    相关资源
    最近更新 更多