【问题标题】:reducebykey in pyspark with multiple key fields in tuplepyspark中的reducebykey,元组中有多个关键字段
【发布时间】:2021-10-26 23:37:10
【问题描述】:
from pyspark import SparkContext, SparkConf

import sys

 
conf = SparkConf().setAppName("test")

sc = SparkContext(conf=conf)

from operator import add

def convertion(num):

    return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')

def compute(strs, num):

    if strs == 'apple':

        return -num

    return num

rdd = sc.parallelize([

    {'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},

    {'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},

    {'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}

])

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd.reduceByKey(lambda x, y: x+y).take(3)

print(rdd.collect())

输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]

我希望输出为: [(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]

我认为我没有正确使用 reducebykey,有人可以告诉我如何根据键元组对它们进行分组吗?

谢谢!

【问题讨论】:

    标签: python pyspark lambda reduce


    【解决方案1】:

    reduceByKey(像所有 Spark 转换一样)返回一个 new rdd。这个新的 rdd 没有分配给变量,因此没有执行转换。

    在最后一行调用rdd.collect()时,变量rdd仍然引用rdd = rdd.map(...)创建的rdd,并打印map调用后的内容。

    reduceByKey 的结果应该分配给一个变量,take(3) 应该被删除:

    rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
    
    rdd = rdd.reduceByKey(lambda x, y: x+y)
    
    print(rdd.collect())
    

    【讨论】:

      猜你喜欢
      • 2015-10-17
      • 2015-07-02
      • 1970-01-01
      • 2016-12-27
      • 2015-12-09
      • 1970-01-01
      • 2018-08-07
      • 1970-01-01
      • 2018-06-27
      相关资源
      最近更新 更多