【发布时间】:2015-02-18 14:42:23
【问题描述】:
有这样的地图:
K1 -> [V1, V2]
K2 -> [V2, V3]
K3 -> [V3]
K4 -> [V4]
因此,我希望获得分组键列表,这些键至少具有值列表中的一个公共元素。解决方案应支持传递关系(G1 组):
G1 = [K1, K2, K3]
G2 = [K4]
我遇到了here 描述的错误。在 Spark 中如何实现?
我的代码如下:
public class Grouping implements Serializable {
public void group(JavaSparkContext sc) {
List<Mapping> list = newArrayList();
list.add(new Mapping("K1", newArrayList("V1", "V2")));
list.add(new Mapping("K2", newArrayList("V2", "V3")));
list.add(new Mapping("K3", newArrayList("V3")));
list.add(new Mapping("K4", newArrayList("V4")));
JavaRDD<Tuple2<Mapping, String>> pairs = sc.parallelize(list).map(Mapping::toPairs).flatMap(p -> p);
JavaPairRDD<String, Iterable<Mapping>> valuesToMappings = pairs.groupBy(Tuple2::_2).
mapToPair(t -> new Tuple2<>(t._1, stream(t._2).map(tt -> tt._1).collect(toList())));
JavaRDD<Group> map = valuesToMappings.map(t -> new Group(traverse(newHashSet(t._2.iterator()), valuesToMappings)));
System.out.println(map.collect());
}
private Set<Mapping> traverse(Set<Mapping> mappings, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
Set<String> values = mappings.stream().map(Mapping::getValues).flatMap(Collection::stream).collect(toSet());
Set<Mapping> mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
while (!mappings.equals(mappingsHavingValues)) {
mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
}
return mappingsHavingValues;
}
private Set<Mapping> mappingsHavingValues(Set<String> values, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
Set<Mapping> result = newHashSet();
for (String value : values) {
List<Iterable<Mapping>> lookup = valuesToMappings.lookup(value);
result.addAll(newArrayList(lookup.get(0))); //here I get an exception
}
return result;
}
public <T> Stream<T> stream(Iterable<T> in) {
return StreamSupport.stream(in.spliterator(), false);
}
}
public class Mapping implements Serializable {
private String key;
private List<String> values;
public Mapping(String key, List<String> values) {
this.key = key;
this.values = values;
}
public String getKey() {
return key;
}
public List<String> getValues() {
return values;
}
public List<Tuple2<Mapping, String>> toPairs() {
return getValues().stream().map(v -> new Tuple2<>(this, v)).collect(toList());
}
}
public class Group {
private Set<Mapping> mappings;
public Group(Set<Mapping> mappings) {
this.mappings = mappings;
}
public Set<Mapping> getMappings() {
return mappings;
}
}
【问题讨论】:
-
为什么 K1,2,3 组合在一起?不会是 K1/2 和 K2/3 吗?
-
链接的问题有一个很好的答案。你为什么卡住了?你的代码是什么样的?
-
@JustinPihony No. K1 和 K2 有 V2,但 K2 通过 V3 连接到 K3。
-
@DanielDarabos 我添加了我的代码。我寻找一个提示,我可以如何将 while 循环替换为 spark-way 解决方案。
-
是的,但是 K1 没有连接到 K3..所以它不应该在同一个列表中。您的规格需要调整 IMO,但我明白了 :)
标签: java apache-spark rdd