【问题标题】:Calculating the cosine similarity between all the rows of a dataframe in pyspark计算pyspark中数据框所有行之间的余弦相似度
【发布时间】:2018-03-27 07:02:05
【问题描述】:

我有一个数据集,其中包含员工的人口统计信息,例如年龄性别、地址等以及他们的工作地点。我从数据集中创建了一个 RDD 并将其转换为 DataFrame。

每个 ID 有多个条目。因此,我创建了一个 DataFrame,其中仅包含工人的 ID 和他/她工作过的各个办公地点。

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

我想根据每个员工的办公地点计算每个员工与其他员工的余弦相似度。

所以,我遍历了 DataFrame 的行,从 DataFrame 中检索了一行:

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

然后使用地图

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

计算提取的行与整个DataFrame之间的余弦相似度。

我认为我的方法不是一个好的方法,因为我正在遍历 DataFrame 的行,它违背了使用 spark 的全部目的。 在pyspark中有更好的方法吗? 请多多指教。

【问题讨论】:

  • 我觉得这个问题有点长。通常最好用最简单的情况询问问题,您会遇到同样的问题。

标签: python dataframe pyspark cosine-similarity


【解决方案1】:

您可以使用 mllib 包计算每行的 TF-IDF 的 L2 范数。然后将该表与自身相乘,得到余弦相似度,即 2 乘以 2 L2norms 的点积:

1. RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
  • 计算TF-IDF:

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
    
    from pyspark.mllib.feature import HashingTF, IDF
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)
    

您可以在HashingTF 中指定特征的数量,以使特征矩阵更小(更少的列)。

    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
  • 计算L2norm:

    from pyspark.mllib.feature import Normalizer
    labels = rdd.map(lambda l: l[0])
    features = tfidf
    
    normalizer = Normalizer()
    data = labels.zip(normalizer.transform(features))
    
  • 通过将矩阵与自身相乘来计算余弦相似度:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix
    mat = IndexedRowMatrix(data).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    
        array([[ 0.        ,  0.        ,  0.        ,  0.        ],
               [ 0.        ,  1.        ,  0.10794634,  0.        ],
               [ 0.        ,  0.10794634,  1.        ,  0.        ],
               [ 0.        ,  0.        ,  0.        ,  1.        ]])
    

    或:在 numpy 数组上使用笛卡尔积和函数 dot

    data.cartesian(data)\
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
        .sortByKey()\
        .collect()
    
        [((1, 1), 1.0),
         ((1, 2), 0.10794633570596117),
         ((1, 3), 0.0),
         ((2, 1), 0.10794633570596117),
         ((2, 2), 1.0),
         ((2, 3), 0.0),
         ((3, 1), 0.0),
         ((3, 2), 0.0),
         ((3, 3), 1.0)]
    

2。数据框

由于您似乎在使用数据框,因此您可以改用 spark mlpackage:

import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
  • 计算 TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
    tf = hashingTF.transform(df)
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
    tfidf = idf.transform(tf)
    
  • 计算L2 norm:

    from pyspark.ml.feature import Normalizer
    normalizer = Normalizer(inputCol="feature", outputCol="norm")
    data = normalizer.transform(tfidf)
    
  • 计算矩阵乘积:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\
            .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    

    或: 使用连接和 UDF 函数 dot

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
        .select(
            psf.col("i.ID").alias("i"), 
            psf.col("j.ID").alias("j"), 
            dot_udf("i.norm", "j.norm").alias("dot"))\
        .sort("i", "j")\
        .show()
    
        +---+---+-------------------+
        |  i|  j|                dot|
        +---+---+-------------------+
        |  1|  2|0.10794633570596117|
        |  1|  3|                0.0|
        |  2|  3|                0.0|
        +---+---+-------------------+
    

本教程列出了乘以大规模矩阵的不同方法:https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

【讨论】:

  • 感谢您的回答。我真的很感激帮助。但是代码给了我一个错误requirement failed: The input column must be ArrayType, but got StringType.'。在使用数据帧的 hashingTF 转换期间。
  • 您必须先将字符串列表拆分为单词列表。我添加了关于如何创建 df 的部分
  • 嗨,当我使用data.cartesian(data)\ .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ .sortByKey()\ .take(5) 时它可以工作。但是当我使用 mllib 代码并将 blockMatrix 转换为 LocalMatrix 时,它给了我 u'requirement failed: The length of the values array must be less than Int.MaxValue. Currently numRows * numCols: 1006095879729669481' 我不明白,因为我正在获取一小部分数据(大约 10 个 ID)所以 numRows * numCols:100。跨度>
  • 尝试将 numFeatures 设置为您在数据框中拥有的不同城市的数量,默认情况下它是 262144 这将是您的块矩阵中的列数(我将其设置为 10您提供的样本数据)。 cartesian joindot product 也可以。查看大矩阵乘法的链接
  • 如何设置numFeatures?我将其设置为hashingTF = HashingTF(numFeatures=20,inputCol="Business", outputCol="tf")。但块矩阵仍然有 1003043309L 列和行。但是对于问题中给出的小例子,我没有那个问题
【解决方案2】:

关于这个问题,由于我在pyspark的一个项目中工作,我必须使用余弦相似度,我不得不说@MaFF的代码是正确的,确实,当我看到他的时候我犹豫了代码,由于他使用向量的L2范数的点积,而理论说:数学上,它是向量的点积与幅度乘积的比率两个向量。

这是我的代码改编成相同的结果,所以我得出的结论是 SKLearn 以不同的方式计算 tfidf,所以如果你尝试使用 sklearn 重播这个练习,你会得到不同的结果。

d = [{'id': '1', 'office': 'Delhi, Mumbai, Gandhinagar'}, {'id': '2', 'office': 'Delhi, Mandi'}, {'id': '3', 'office': 'Hyderbad, Jaipur'}]
df_fussion = spark.createDataFrame(d)
df_fussion = df_fussion.withColumn('office', F.split('office', ', '))


from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="office", outputCol="tf")
tf = hashingTF.transform(df_fussion)

idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
data = idf.transform(tf)   

@udf
def sim_cos(v1,v2):
    try:
        p = 2
        return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
    except:
        return 0

result = data.alias("i").join(data.alias("j"), F.col("i.ID") < F.col("j.ID"))\
    .select(
        F.col("i.ID").alias("i"),
        F.col("j.ID").alias("j"),
        sim_cos("i.feature", "j.feature").alias("sim_cosine"))\
    .sort("i", "j")
result.show()

我还想与您分享一些简单的测试,我使用简单的向量进行了结果正确的测试:

亲切的问候,

【讨论】:

  • 感谢您分享代码。请问您一直在处理的数据集有多大?计算具有 1000 个特征和 10000 行的 df 的相似度需要相当长的时间。我想知道有没有办法加快速度?
  • 我必须处理大约 100k 行的数据集。如果你想将它们相互比较,你知道N行的可能伙伴的数量是N*(N-1)/2,这意味着,5*(10**9)种可能的组合。因此,我建议您将您认为对隔离小行组具有代表性的子特征进行分组,并将此技术应用于这些组。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-04-02
  • 2017-03-19
  • 1970-01-01
  • 2019-07-21
  • 1970-01-01
  • 2015-05-24
相关资源
最近更新 更多