【问题标题】:Convert Nested dictionary to Pyspark Dataframe将嵌套字典转换为 Pyspark 数据框
【发布时间】:2020-11-01 12:28:38
【问题描述】:

向程序员同行问好。

我最近开始使用 pyspark 并且来自熊猫背景。我需要计算数据中用户的相似度。由于我无法从 pyspark 中找到,我求助于使用 python 字典来创建相似性数据框。

但是,我没有将嵌套字典转换为 pyspark 数据框的想法。 您能否为我提供一个实现这一预期结果的方向。

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from scipy.spatial import distance


spark = SparkSession.builder.getOrCreate()

from pyspark.sql import *

traindf = spark.createDataFrame([
    ('u11',[1, 2, 3]),
    ('u12',[4, 5, 6]),
    ('u13',[7, 8, 9])
]).toDF("user","rating")

traindf.show()

输出

+----+---------+
|user|   rating|
+----+---------+
| u11|[1, 2, 3]|
| u12|[4, 5, 6]|
| u13|[7, 8, 9]|
+----+---------+

它想生成用户之间的相似性并将其放入 pyspark 数据帧中。

parent_dict = {}
for parent_row in traindf.collect():
#     print(parent_row['user'],parent_row['rating'])
    child_dict = {}
    for child_row in traindf.collect():
        similarity = distance.cosine(parent_row['rating'],child_row['rating'])
        child_dict[child_row['user']] = similarity
    parent_dict[parent_row['user']] = child_dict

print(parent_dict)

输出:

{'u11': {'u11': 0.0, 'u12': 0.0253681538029239, 'u13': 0.0405880544333298},
 'u12': {'u11': 0.0253681538029239, 'u12': 0.0, 'u13': 0.001809107314273195},
 'u13': {'u11': 0.0405880544333298, 'u12': 0.001809107314273195, 'u13': 0.0}}

我想从这本字典中构造一个 pyspark 数据框。

+-----+-----+--------------------+
|user1|user2|          similarity|
+-----+-----+--------------------+
|  u11|  u11|                 0.0|
|  u11|  u12|  0.0253681538029239|
|  u11|  u13|  0.0405880544333298|
|  u12|  u11|  0.0253681538029239|
|  u12|  u12|                 0.0|
|  u12|  u13|0.001809107314273195|
|  u13|  u11|  0.0405880544333298|
|  u13|  u12|0.001809107314273195|
|  u13|  u13|                 0.0|
+-----+-----+--------------------+

到目前为止,我尝试将 dict 转换为 pandas 数据帧并将其转换为 pyspark 数据帧。但是,我需要大规模地执行此操作,并且我正在寻找更闪亮的方式来执行此操作。

parent_user = []
child_user = []
child_similarity = []

for parent_row in traindf.collect():
    
    for child_row in traindf.collect():
        similarity = distance.cosine(parent_row['rating'],child_row['rating'])
        child_user.append(child_row['user'])
        child_similarity.append(similarity)
        parent_user.append(parent_row['user'])

my_dict = {}
my_dict['user1'] = parent_user
my_dict['user2'] = child_user
my_dict['similarity'] = child_similarity

import pandas as pd

pd.DataFrame(my_dict)
df = spark.createDataFrame(pd.DataFrame(my_dict))
df.show()

输出:

+-----+-----+--------------------+
|user1|user2|          similarity|
+-----+-----+--------------------+
|  u11|  u11|                 0.0|
|  u11|  u12|  0.0253681538029239|
|  u11|  u13|  0.0405880544333298|
|  u12|  u11|  0.0253681538029239|
|  u12|  u12|                 0.0|
|  u12|  u13|0.001809107314273195|
|  u13|  u11|  0.0405880544333298|
|  u13|  u12|0.001809107314273195|
|  u13|  u13|                 0.0|
+-----+-----+--------------------+

【问题讨论】:

  • 你知道如何将字典转换为平面熊猫数据框吗?然后你可以这样做,然后创建 pyspark 数据框。或者您可以将顶级 dict 转换为 key,dict 列表,将其转换为 pyspark 数据帧,然后将 explode 应用于 dict 列
  • @YaroslavFyodorov 我已经尝试过了,我已经更新了我的问题。但是我正在寻找更多的火花方式来做到这一点。请指导我采用可以扩展到一百万条记录的方法。谢谢。

标签: python pandas pyspark


【解决方案1】:

也许你可以这样做:

import pandas as pd
from pyspark.sql import SQLContext

my_dic = {'u11': {'u11': 0.0, 'u12': 0.0253681538029239, 'u13': 0.0405880544333298},
                 'u12': {'u11': 0.0253681538029239, 'u12': 0.0, 'u13': 0.001809107314273195},
                 'u13': {'u11': 0.0405880544333298, 'u12': 0.001809107314273195, 'u13': 0.0}}

df =  pd.DataFrame.from_dict(my_dic).unstack().to_frame().reset_index()
df.columns = ['user1', 'user2', 'similarity']
sqlCtx = SQLContext(sc) # sc is spark context
sqlCtx.createDataFrame(df).show()

【讨论】:

  • 感谢您的回复。您能否提出一些不涉及转换为 pandas 数据框的方法。
  • @vedprakash 你在你的 Spark 数据帧上做了一个 collect() 并将所有数据带到你的主节点中,即使你设法使用 spark 进行这种转换,我认为你的问题仍然存在并且你的方法没有t 可以很好地扩展更多数据
  • 我建议您首先重新设计您所做的事情的方法,然后考虑如何将您的数据转换为所需的格式。
  • 我完全同意你的 cmets 。如何重新设计是我一无所知的地方,基本上我的方法是蛮力。
  • @vedprakash 看看这个:github.com/PacktPublishing/…我想你想做这样的事情,源代码可读性很强,而且 MovieLens 数据集是公开的
【解决方案2】:

好的,现在你的问题更清楚了。我假设您从用户评分的火花数据框开始。 你想要做的是这个 DF 与其自身的外部连接,这将创建一个包含所有可能的用户对(及其评级)的交叉产品,包括重复两次的同一用户行(可以稍后过滤)然后计算包含相似度的新列。

【讨论】:

  • 是的,你完全正确。我已经写下了这个解决方案。这对我有用,但是如果可以改进,那会让我很高兴。
【解决方案3】:
from pyspark.sql.types import *
import pyspark.sql.functions as psf

def cos_sim(a,b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

dot_udf = psf.udf(lambda x,y: cos_sim(x,y), FloatType())

data.alias("i").join(data.alias("j"), psf.col("i.user") != psf.col("j.user"))\
    .select(
        psf.col("i.user").alias("user1"), 
        psf.col("j.user").alias("user2"), 
        dot_udf("i.rating", "j.rating").alias("similarity"))\
    .sort("similarity")\
    .show()

输出如你所愿:

+-----+-----+----------+
|user1|user2|similarity|
+-----+-----+----------+
|  u11|  u12|0.70710677|
|  u13|  u11|0.70710677|
|  u11|  u13|0.70710677|
|  u12|  u11|0.70710677|
|  u12|  u13|       1.0|
|  u13|  u12|       1.0|
+-----+-----+----------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-05-10
    • 1970-01-01
    • 2021-11-28
    • 2022-01-07
    • 2020-11-28
    • 2021-08-30
    • 2018-08-14
    相关资源
    最近更新 更多