【问题标题】:Grouping keys by value's list common elements按值分组键列出常见元素
【发布时间】:2015-02-18 14:42:23
【问题描述】:

有这样的地图:

K1 -> [V1, V2]
K2 -> [V2, V3]
K3 -> [V3]
K4 -> [V4]

因此,我希望获得分组键列表,这些键至少具有值列表中的一个公共元素。解决方案应支持传递关系(G1 组):

G1 = [K1, K2, K3]
G2 = [K4]

我遇到了here 描述的错误。在 Spark 中如何实现?

我的代码如下:

public class Grouping implements Serializable {

    public void group(JavaSparkContext sc) {
        List<Mapping> list = newArrayList();
        list.add(new Mapping("K1", newArrayList("V1", "V2")));
        list.add(new Mapping("K2", newArrayList("V2", "V3")));
        list.add(new Mapping("K3", newArrayList("V3")));
        list.add(new Mapping("K4", newArrayList("V4")));

        JavaRDD<Tuple2<Mapping, String>> pairs = sc.parallelize(list).map(Mapping::toPairs).flatMap(p -> p);
        JavaPairRDD<String, Iterable<Mapping>> valuesToMappings = pairs.groupBy(Tuple2::_2).
            mapToPair(t -> new Tuple2<>(t._1, stream(t._2).map(tt -> tt._1).collect(toList())));

        JavaRDD<Group> map = valuesToMappings.map(t -> new Group(traverse(newHashSet(t._2.iterator()), valuesToMappings)));

        System.out.println(map.collect());
    }

    private Set<Mapping> traverse(Set<Mapping> mappings, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<String> values = mappings.stream().map(Mapping::getValues).flatMap(Collection::stream).collect(toSet());
        Set<Mapping> mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        while (!mappings.equals(mappingsHavingValues)) {
            mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        }

        return mappingsHavingValues;
    }

    private Set<Mapping> mappingsHavingValues(Set<String> values, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<Mapping> result = newHashSet();
        for (String value : values) {
            List<Iterable<Mapping>> lookup = valuesToMappings.lookup(value);
            result.addAll(newArrayList(lookup.get(0))); //here I get an exception
        }
        return result;
    }

    public <T> Stream<T> stream(Iterable<T> in) {
        return StreamSupport.stream(in.spliterator(), false);
    }
}

public class Mapping implements Serializable {
    private String key;
    private List<String> values;

    public Mapping(String key, List<String> values) {
        this.key = key;
        this.values = values;
    }

    public String getKey() {
        return key;
    }

    public List<String> getValues() {
        return values;
    }

    public List<Tuple2<Mapping, String>> toPairs() {
        return getValues().stream().map(v -> new Tuple2<>(this, v)).collect(toList());
    }
}

public class Group {

    private Set<Mapping> mappings;

    public Group(Set<Mapping> mappings) {
        this.mappings = mappings;
    }

    public Set<Mapping> getMappings() {
        return mappings;
    }
}

【问题讨论】:

  • 为什么 K1,2,3 组合在一起?不会是 K1/2 和 K2/3 吗?
  • 链接的问题有一个很好的答案。你为什么卡住了?你的代码是什么样的?
  • @JustinPihony No. K1 和 K2 有 V2,但 K2 通过 V3 连接到 K3。
  • @DanielDarabos 我添加了我的代码。我寻找一个提示,我可以如何将 while 循环替换为 spark-way 解决方案。
  • 是的,但是 K1 没有连接到 K3..所以它不应该在同一个列表中。您的规格需要调整 IMO,但我明白了 :)

标签: java apache-spark rdd


【解决方案1】:

您正在寻找图中的连通分量。 org.apache.spark.graphx.lib.ConnectedComponents为此提供了分布式解决方案。

【讨论】:

  • 抱歉缺少代码示例。我现在没有时间。无论如何,希望这会有所帮助!
猜你喜欢
  • 1970-01-01
  • 2012-09-08
  • 1970-01-01
  • 2020-04-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-02-21
相关资源
最近更新 更多