【发布时间】:2021-10-26 23:37:10
【问题描述】:
from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
from operator import add
def convertion(num):
return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')
def compute(strs, num):
if strs == 'apple':
return -num
return num
rdd = sc.parallelize([
{'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},
{'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},
{'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}
])
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd.reduceByKey(lambda x, y: x+y).take(3)
print(rdd.collect())
输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]
我希望输出为:
[(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]
我认为我没有正确使用 reducebykey,有人可以告诉我如何根据键元组对它们进行分组吗?
谢谢!
【问题讨论】:
标签: python pyspark lambda reduce