【问题标题】:spark, small issue about reduceByKeyspark,关于 reduceByKey 的小问题
【发布时间】:2016-02-16 21:13:05
【问题描述】:

我是 Spark 的新手。我正在尝试实现 tf-idf。我需要计算每个单词在每个文档中出现的次数以及每个文档中的总单词数。

我想进行 reduce 并可能进行另一个操作,但我还不知道如何操作。 这是我的输入:

对的形式为(documentName , (word, wordCount)) ex。

("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
    ("doc2",("have", 5))

键是文档,值是单词,以及该单词在该文档中出现的次数。我想计算每个文档中的总词数,并可能计算该词的百分比。

我想要的输出:

("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
    ("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))

我得到了效果

corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))

起点:

collect_of_docs = [(doc1,text1), (doc2, text2),....]

def count_words(x):
    l = []
    words = x[1].split()
    for w in words:
        l.append(((w, x[0]), 1))
    return l

sc = SparkContext() 
corpus = sc.parallelize(collect_of_docs)
input = (corpus
    .flatMap(count_words)
    .reduceByKey(add)
    .map(lambda ((x,y), z) : (y, (x,z))))

如果可能的话,我可能只想用一个棘手的运算符进行一次 reduce 操作。任何帮助表示赞赏:) 提前致谢。

【问题讨论】:

    标签: python apache-spark pyspark reduce tf-idf


    【解决方案1】:

    一般来说,flatMap 只是为了稍后收集您的数据是没有意义的。我假设您的数据或多或少是这样的:

    collect_of_docs = sc.parallelize([
        (1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."),
        (2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."),
        (3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")])
    

    首先,我们需要一些使用基本正则表达式和 Counter 的助手:

    from __future__ import division  # If for some reason you use Python 2.x
    import re
    from collections import Counter
    
    def count_words(doc, pattern=re.compile("\w+")):
        """Given a tuple (doc_id, text)
        return a tuple (doc_id, tokens_count
    
        >>> count_words((1, "Foo bar bar."))
        (1, Counter({'Foo': 1, 'bar': 2}))
        """
        (doc_id, text) = doc
        return (doc_id, Counter(pattern.findall(text))) 
    
    def compute_tf(cnt):
        """Convert term counter to term frequency
    
        >>> compute_tf(Counter({'Foo': 1, 'bar': 2}))
        {'Foo': 0.3333333333333333, 'bar': 0.6666666666666666}
        """
        n = sum(cnt.values())
        return {k: v / n for (k, v) in cnt.items()}
    

    以及最终结果:

    tfs = (collect_of_docs
        .map(count_words)
        .mapValues(compute_tf))
    
    tfs.sortByKey().first()
    
    ## (1,
    ##  {'Lorem': 0.125,
    ##   'adipiscing': 0.125,
    ##   'amet': 0.125,
    ##   'consectetur': 0.125,
    ##   'dolor': 0.125,
    ##   'elit': 0.125,
    ##   'ipsum': 0.125,
    ##   'sit': 0.125})
    

    使用上述文档频率可以计算如下:

    from operator import add
    
    dfs = (tfs
        .values()
        .flatMap(lambda kv: ((k, 1) for k in kv.keys()))
        .reduceByKey(add))
    
    dfs.sortBy(lambda x: -x[1]).take(5)
    
    ## [('ipsum', 2),
    ##  ('dolor', 2),
    ##  ('consectetur', 1),
    ##  ('finibus', 1),
    ##  ('fringilla', 1)]
    

    【讨论】:

    • 是的,这就是我想要的:)) 出于好奇,我想再问一件事。假设我想计算每个单词的 tf-idf 分数,并以 doc1 word1 score1 word2 score2 的形式展示它们; doc2 word1 score1 word2 score2 word3 score3;..........这里最有效的方法是什么?我想我只是遍历 tfs 键和相应的单词,然后从 dfs 中查找单词 idf。你会怎么做?
    • 视情况而定。如果唯一术语的数量相对较少,那么您可以收集 dfs、广播为 dict 和 flatMap over tfs。否则我会将 tfs flatMap 到(term, (doc_id, freq))) 并加入dfs
    • 即使看起来不像,也有很大的不同。使用广播变量不需要改组。所以这是一个本地操作。如果 dfs 很大,那么广播会成为一个限制因素,而 shuffle / hash join 会成为更好的解决方案。
    猜你喜欢
    • 1970-01-01
    • 2015-12-03
    • 2016-12-30
    • 2014-12-21
    • 1970-01-01
    • 2012-07-27
    • 2016-02-25
    • 2019-03-05
    • 1970-01-01
    相关资源
    最近更新 更多