【发布时间】:2014-03-07 10:20:37
【问题描述】:
我目前正在探索布隆过滤器。我浏览了大部分关于bloom fitlers的博客,知道什么是,但仍然无法找出一个关于万一连接的例子。
每篇文章都说它会减少网络 I/O,但都没有说明如何?特别是 http://vanjakom.wordpress.com/tag/distributed-cache/ 很好,但它看起来和我刚开始使用 map reduce 一样复杂。
谁能帮我在下面的例子中实现布隆过滤器(reduceside join)
2个mapers读取用户记录和部门记录,reducer加入
用户记录
身份证,姓名
3738,里奇·戈尔
12946,罗尼·萨姆
17556,大卫·加特
3443,雷切尔·史密斯
5799,保罗·罗斯塔
部门记录
3738,销售
12946,市场营销
17556,市场营销
3738,销售
3738,销售
代码
public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
private Text outkey = new Text();
private Text outval = new Text();
private String id, name;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
throws IOException {
String line = value.toString();
String arryUsers[] = line.split(",");
id = arryUsers[0].trim();
name = arryUsers[1].trim();
outkey.set(id);
outval.set("A"+ name);
ouput.collect(outkey, outval);
}
}
public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String arryDept[] = line.split(",");
depid = arryDept[0].trim();
dep = arryDept[1].trim();
Outk.set(depid);
Outv.set("B" + dep);
output.collect(Outk, Outv);
}
}
和减速器
ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {
listA.clear();
listB.clear();
while (values.hasNext()) {
tmp = values.next();
if (tmp.charAt(0) == 'A') {
listA.add(new Text(tmp.toString().substring(1)));
} else if (tmp.charAt(0) == 'B') {
listB.add(new Text(tmp.toString().substring(1)));
}
}
executejoinlogic(output);
}
private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {
if (!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listA) {
for (Text B : listB) {
output.collect(A, B);
}
}
}
}
}
上述场景可以实现布隆过滤器吗?
如果是,那么请帮我实现这个?
【问题讨论】:
标签: java hadoop mapreduce bloom-filter