【问题标题】:Sum only certain values of an array based on a condition pyspark根据条件 pyspark 仅对数组的某些值求和
【发布时间】:2021-12-28 20:03:59
【问题描述】:

我想创建一个基于数组值总和的列。但是,如果总和超过目标值,它只会将创建低于或等于目标的最高值的值相加。这是一个例子:

| Target % | Array            | name      | Total
| ---------| -----------------|-----------|----------
| 4.5      | [1.5,2.5,3.0,2.0]| John      | 4.5
| 3        | [2.5,1.0,0.5,1.0]| Jim       | 3.0
| 5        | [1.0,1.0,1.5,1.0]| Jane      | 4.5

【问题讨论】:

    标签: arrays pyspark sum conditional-statements aggregate


    【解决方案1】:

    但是,如果总和超过目标值,它只会将创造最高值低于或等于目标值的值相加。

    要找到比目标值<= 的最高值总和,您必须找到不同值组合的总和,然后找到合适的值。

    这是一个例子:

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    from itertools import combinations
    from pyspark.sql.types import ArrayType, DoubleType, StringType, StructField, StructType
    
    
    def find_highest(values, target):
        if not values:
            return None
        values.sort()
        max_value = values[0]
        if max_value > target:
            return None
        if max_value == target:
            return max_value
        for i in range(1, len(values)):
            if values[i] <= target and values[i] > max_value:
                max_value = values[i]
        return max_value
    
    
    def find_closest_sum(numbers, target):
        target = float(target)
        if sum(numbers) <= target:
            return sum(numbers)
        results = []
        for n in range(1, len(numbers) + 1):
            sumlist = [sum(l) for l in combinations(numbers, n)]
            highest = find_highest(sumlist, target)
            if highest:
                results.append(highest)
        return find_highest(results, target)
    
    
    spark = SparkSession.builder.getOrCreate()
    data = [
        {"Target": "4.5", "Array": [1.5, 2.5, 3.0, 2.0]},
        {"Target": "3", "Array": [2.5, 1.0, 0.5, 1.0]},
        {"Target": "5", "Array": [1.0, 1.0, 1.5, 1.0]},
        {"Target": "7", "Array": [5.0, 1.0, 4.0]},
    ]
    schema = StructType(
        [StructField("Target", StringType()), StructField("Array", ArrayType(DoubleType()))]
    )
    df = spark.createDataFrame(data=data, schema=schema)
    df = df.withColumn("Total", F.udf(find_closest_sum)(F.col("Array"), F.col("Target")))
    

    结果:

    +------+--------------------+-----+                                             
    |Target|Array               |Total|
    +------+--------------------+-----+
    |4.5   |[1.5, 2.5, 3.0, 2.0]|4.5  |
    |3     |[2.5, 1.0, 0.5, 1.0]|3.0  |
    |5     |[1.0, 1.0, 1.5, 1.0]|4.5  |
    |7     |[5.0, 1.0, 4.0]     |6.0  |
    +------+--------------------+-----+
    

    【讨论】:

    • 感谢您的帮助。这是我使用这个建议得到的错误:A NoneType 对象没有属性 _jvm。请检查对象的拼写和/或数据类型。我想知道它是否与这一行有关: df = df.withColumn("Total", F.udf(find_closest_sum)(F.col("Array"), F.col("Target"))) ^那个括号应该在那里吗?此外,Target 列是双精度,而不是字符串,所以我取出了浮点转换线。随着这些变化,错误是:
    • line 701, in nonzero raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', " ValueError : 无法将列转换为布尔值:在构建 DataFrame 布尔表达式时,请使用 '&' 代表 'and','|' 代表 'or','~' 代表 'not'。
    • df = df.withColumn("Total", F.udf(find_closest_sum)(F.col("Array"), F.col("Target"))) 我指的括号是 F.udf(find_closest_sum) 之后的那个
    • @AlexTriece 我不确定你是如何运行代码的。是的,括号应该在那里,它定义了一个将列作为参数的函数。您能否确认您可以运行我提供的代码并且它可以按原样运行(只是我提供的代码),然后我们可以调试为什么它在您的设置中不起作用。
    • 我已经运行了代码,但是我没有运行这个部分: spark = SparkSession.builder.getOrCreate() data = [ {"Target": "4.5", "Array": [1.5, 2.5, 3.0, 2.0]}, {"目标": "3", "数组": [2.5, 1.0, 0.5, 1.0]}, {"目标": "5", "数组": [1.0, 1.0, 1.5, 1.0]}, {"Target": "7", "Array": [5.0, 1.0, 4.0]}, ] schema = StructType( [StructField("Target", StringType()), StructField("Array" , ArrayType(DoubleType()))] ) df = spark.createDataFrame(data=data, schema=schema) 因为我在一个已经存在的数据帧上执行此操作。
    猜你喜欢
    • 2019-10-18
    • 2021-03-27
    • 1970-01-01
    • 1970-01-01
    • 2020-09-27
    • 2020-12-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多