【问题标题】:spark python reduced by keyspark python按键减少
【发布时间】:2014-10-29 14:03:01
【问题描述】:

我想计算哪些用户查看哪个类别的频率。我是 SparkPython 的新手。这是演示数据:

dataSource = sc.parallelize( [("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")] )

我按关键用户减少了这个并收集了所有类别。然后我分头数到以后:

dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y)
catSplitted = dataReduced.map(lambda (user,values) : (values.split(","),user))

每个用户的输出格式如下 -> ([cat1,cat1,cat2,catn], user)

谁能告诉我如何用 SparkPython 计算类别,或者你有其他方法来解决这个问题吗?

【问题讨论】:

    标签: python apache-spark


    【解决方案1】:

    现在我得到了我期望的结果。但我想像我一样连接钥匙是不利的。也许有人有其他解决方案或任何建议?

    # count the categorie views per user
    # data
    dataSource = sc.parallelize( [("user1", "film"), ("user1", "film"), ("user2", "film"), ("user2", "books"), ("user2", "books")] )
    # Create Key,Value | concatenate user and category as key
    dataKeyValue = dataSource.map(lambda (user,category) : (user+","+category, 1))
    # reduce 
    dataReduced = dataKeyValue.reduceByKey(lambda x,y : x + y)
    # result => [('user2,books', 2), ('user1,film', 2), ('user2,film', 1)]
    # split key
    cleanResult = dataReduced.map(lambda (key,value) : (key.split(","),value))
    

    【讨论】:

      【解决方案2】:

      在纯python中:

      ds = [('user1',['film','film','books']), ('user2',['film','books','books'])]
      ds1 = map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))),ds)
      print ds1
      

      返回:

      [('user1', (('film', 2), ('books', 1))), ('user2', (('film', 1), ('books', 2)))]
      

      在spark中,应该如下(不确定,因为我现在无法访问spark):

      dataReduced = dataSource.reduceByKey(lambda x,y : x + "," + y)
      catSplitted = dataReduced.map(lambda (user,values) : (user, values.split(","))
      catCounted = catSplitted.map(lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y)))
      

      希望这会有所帮助。如果没有,您可以尝试检查如何使用 spark 命令获取 python 功能。基本逻辑应该可以工作

      【讨论】:

      • 谢谢,这似乎有效。您能否简要解释一下您的最后一个 lambda 函数。哪个变量代表哪个值 - lambda (x,y):(x,tuple(set((z,y.count(z)) for z in y))))
      • 我正在列表中创建一个 cat 元组和 cat 计数的元组,因此 ['film', 'film', 'books'] 变为 (('film',2),('电影',2),('书',1))。然后 set 删除重复项,然后将 set 转换回元组。
      【解决方案3】:

      另一个(更高效且易于阅读的 IMO)。 高效,因为您 SPARK DAG 在基于用户的分区之后不需要为重新分区类别收集,并且易于使用,因为它使用数据帧,而不是 RDD。

      首先,根据用户和类别制作一个哈希列:

      import pyspark.sql.functions as F
      df = spark.createDataFrame([("u1", "f"), ("u1", "f"), ("u2", "f"), ("u2", "b"), ("u2", "b")], schema=['u', 'c'])
      df = df.withColumn('hash', f.hash())
      

      其次,我们通过哈希进行分区,并在本地聚合:

      from pyspark.sql import Window
      win = Window.partitionBy('hash')
      df.withColumns('count', F.count('hash').over(win)).distinct().show()
      

      【讨论】:

        猜你喜欢
        • 2015-07-08
        • 1970-01-01
        • 2018-05-08
        • 1970-01-01
        • 1970-01-01
        • 2017-04-05
        • 2019-06-11
        • 2016-01-22
        • 2018-02-20
        相关资源
        最近更新 更多