【问题标题】:PySpark RDD with Typed List convert to DataFramePySpark RDD 与 Typed List 转换为 DataFrame
【发布时间】:2018-01-22 06:28:44
【问题描述】:

我有一个如下格式的 RDD:

 [(1, 
 (Rating(user=1, product=3, rating=0.99), 
  Rating(user=1, product=4, rating=0.91),  
  Rating(user=1, product=9, rating=0.68))),   
  (2, 
 (Rating(user=2, product=11, rating=1.01), 
  Rating(user=2, product=12, rating=0.98), 
  Rating(user=2, product=45, rating=0.97))), 
  (3, 
 (Rating(user=3, product=23, rating=1.01), 
  Rating(user=3, product=34, rating=0.99), 
  Rating(user=3, product=45, rating=0.98)))]

我一直找不到任何使用 map lambda 等处理此类命名数据的示例。 理想情况下,我希望输出是以下格式的数据框:

User    Ratings
1       3,0.99|4,0.91|9,0.68
2       11,1.01|12,0.98|45,0.97
3       23,1.01|34,0.99|45,0.98

任何指针将不胜感激。请注意,评分的数量是可变的,而不仅仅是 3 个。

【问题讨论】:

标签: python apache-spark pyspark spark-dataframe rdd


【解决方案1】:

RDD定义为

from pyspark.mllib.recommendation import Rating

rdd = sc.parallelize([
    (1,
        (Rating(user=1, product=3, rating=0.99), 
        Rating(user=1, product=4, rating=0.91),  
        Rating(user=1, product=9, rating=0.68))),   
    (2, 
        (Rating(user=2, product=11, rating=1.01), 
        Rating(user=2, product=12, rating=0.98), 
        Rating(user=2, product=45, rating=0.97))), 
    (3, 
        (Rating(user=3, product=23, rating=1.01), 
        Rating(user=3, product=34, rating=0.99), 
        Rating(user=3, product=45, rating=0.98)))])

你可以mapValueslist:

df = rdd.mapValues(list).toDF(["User", "Ratings"])

df.printSchema()
# root
#  |-- User: long (nullable = true)
#  |-- Ratings: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- user: long (nullable = true)
#  |    |    |-- product: long (nullable = true)
#  |    |    |-- rating: double (nullable = true)

或提供架构:

df = spark.createDataFrame(rdd, "struct<User:long,ratings:array<struct<user:long,product:long,rating:double>>>")


df.printSchema()
# root
#  |-- User: long (nullable = true)
#  |-- ratings: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- user: long (nullable = true)
#  |    |    |-- product: long (nullable = true)
#  |    |    |-- rating: double (nullable = true)
# 

df.show()
# +----+--------------------+
# |User|             ratings|
# +----+--------------------+
# |   1|[[1,3,0.99], [1,4...|
# |   2|[[2,11,1.01], [2,...|
# |   3|[[3,23,1.01], [3,...|
# +----+--------------------+

如果你想删除user字段:

df_without_user = spark.createDataFrame(
    rdd.mapValues(lambda xs: [x[1:] for x in xs]),
    "struct<User:long,ratings:array<struct<product:long,rating:double>>>"
)

如果要将列格式化为单个字符串,则必须使用 udf

from pyspark.sql.functions import udf

@udf                                                                 
def format_ratings(ratings):
    return "|".join(",".join(str(_) for _ in r[1:]) for r in ratings)


df.withColumn("ratings", format_ratings("ratings")).show(3, False)

# +----+-----------------------+
# |User|ratings                |
# +----+-----------------------+
# |1   |3,0.99|4,0.91|9,0.68   |
# |2   |11,1.01|12,0.98|45,0.97|
# |3   |23,1.01|34,0.99|45,0.98|
# +----+-----------------------+

“魔法”如何运作:

  • 遍历评分数组

    (... for r in ratings)
    
  • 对于每个评分,删除第一个字段并将剩余转换为 str

    (str(_) for _ in r[1:])
    
  • 用“,”分隔符连接评分中的字段:

    ",".join(str(_) for _ in r[1:])
    
  • |连接所有评级字符串

    "|".join(",".join(str(_) for _ in r[1:]) for r in ratings)
    

替代实现:

@udf                                                                 
def format_ratings(ratings):
    return "|".join("{},{}".format(r.product, r.rating) for r in ratings)

【讨论】:

  • 如果投反对票,请发表评论以帮助作者和其他用户了解可以改进的地方
  • 加分的最后一个问题.. 我将如何将评级的精度从 0.987 调整到 0.9 ?我认为这与开头附近的模式有关,但实际上,这行:df = spark.createDataFrame(rdd, "struct&lt;User:long,ratings:array&lt;struct&lt;user:long,product:long,rating:double&gt;&gt;&gt;") 似乎对我不起作用。我之前用过 rdd.toDF()。
  • 你可以使用格式"{},{:.1f}".format(r.product, r.rating)
猜你喜欢
  • 2016-05-29
  • 2021-06-29
  • 2023-03-13
  • 2018-09-14
  • 1970-01-01
  • 1970-01-01
  • 2017-11-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多