【问题标题】:reduceByKey in pysparkpyspark 中的 reduceByKey
【发布时间】:2018-08-07 22:03:33
【问题描述】:

我有一个类似于以下的 rdd:

s = sc.parallelize([(901943132160, {'P1': 0.0, 'cust_id': 'C5'}), (901943132160, {'P2': 0.125, 'cust_id': 'C5'}), (901943132160, {'cust_id': 'C5', 'P3': 0.875}), (901943132160, {'P4': 0.0, 'cust_id': 'C5'}), (901943132160, {'P5': 0.0, 'cust_id': 'C5'}), (901943132160, {'P8': 0.0, 'cust_id': 'C5'}), (901943132160, {'cust_id': 'C5', 'P9': 0.875}), (1357209665536, {'P1': 0.0, 'cust_id': 'C2'}), (1357209665536, {'P2': 0.0, 'cust_id': 'C2'}), (1357209665536, {'cust_id': 'C2', 'P3': 1.0}), (1357209665536, {'P4': 0.0, 'cust_id': 'C2'}), (1357209665536, {'P5': 0.0, 'cust_id': 'C2'}), (1357209665536, {'P8': 0.0, 'cust_id': 'C2'}), (1357209665536, {'cust_id': 'C2', 'P9': 0.75}), (489626271744, {'P1': 0.0, 'cust_id': 'C4'}), (489626271744, {'P2': 0.0, 'cust_id': 'C4'}), (489626271744, {'cust_id': 'C4', 'P3': 0.5}), (489626271744, {'P4': 0.5, 'cust_id': 'C4'}), (489626271744, {'P5': 0.0, 'cust_id': 'C4'}), (489626271744, {'P8': 0.125, 'cust_id': 'C4'}), (489626271744, {'cust_id': 'C4', 'P9': 0.375}), (463856467968, {'P1': 0.08333333333333333, 'cust_id': 'C3'}), (463856467968, {'P2': 0.3333333333333333, 'cust_id': 'C3'}), (463856467968, {'cust_id': 'C3', 'P3': 0.3333333333333333}), (463856467968, {'P4': 0.08333333333333333, 'cust_id': 'C3'}), (463856467968, {'P5': 0.08333333333333333, 'cust_id': 'C3'}), (463856467968, {'P8': 0.3333333333333333, 'cust_id': 'C3'}), (463856467968, {'cust_id': 'C3', 'P9': 0.3333333333333333}), (1305670057984, {'P1': 0.5, 'cust_id': 'C1'}), (1305670057984, {'P2': 0.375, 'cust_id': 'C1'}), (1305670057984, {'cust_id': 'C1', 'P3': 0.0}), (1305670057984, {'P4': 0.0, 'cust_id': 'C1'}), (1305670057984, {'P5': 0.6875, 'cust_id': 'C1'}), (1305670057984, {'P8': 0.0625, 'cust_id': 'C1'}), (1305670057984, {'cust_id': 'C1', 'P9': 0.0625})])

我想编写一个 reduceByKey 操作,我期待类似于以下内容:

[('C3', {'P8': 0.3333333333333333, 'P1': 0.08333333333333333, 'P9': 0.3333333333333333, 'P2': 0.3333333333333333, 'P3': 0.3333333333333333}), ('C4', {'P9': 0.375, 'P3': 0.5, 'P4': 0.5}), ('C5', {'P9': 0.875, 'P2': 0.125, 'P3': 0.875}), ('C1', {'P1': 0.5,'P2': 0.375,'P5': 0.6875}), ('C2', {'P9': 0.75, 'P3': 1.0, 'P5': 0.0})]

我想要做的是,对于每个产品,我都会检查分数,只返回三个分数最好的产品。计划通过 reduceByKey 来做到这一点

【问题讨论】:

    标签: python-3.x apache-spark pyspark rdd


    【解决方案1】:

    您可以通过执行以下操作来实现您的要求

    def dictionaryFunc(x):
        d = {}
        for i in range(0, len(x), 2):
            d[x[i]] = x[i+1]
        return d
    
    from operator import add
    s.map(lambda x: (x[1]['cust_id'], sorted(x[1].items())[0])).reduceByKey(add).map(lambda x: [x[0], dictionaryFunc(x[1])])
    

    其中
    x[1]['cust_id'] 是每个字典的 cust_id 键的值
    sorted(x[1].items())[0] 是排序字典的第一个元素
    reduceByKey(add) 是添加分组的第二个元素元组
    dictionaryFunc(x[1])] 正在根据您的要求组成字典

    【讨论】:

      猜你喜欢
      • 2017-05-17
      • 2018-06-27
      • 2015-12-09
      • 1970-01-01
      • 1970-01-01
      • 2016-06-05
      • 1970-01-01
      • 2016-12-27
      • 2015-07-02
      相关资源
      最近更新 更多