【问题标题】:Pyspark apply a function on dataframePyspark 在数据框上应用函数
【发布时间】:2020-01-09 20:15:01
【问题描述】:

我的python方法如下;

def leadtime_crossdock_calc(slt, wlt, dow, freq):
    temp_lt = [0, 0, 0, 0, 0, 0, 0]
    remaining = []
    for i in range(0, 7):
        remaining.append((dow[i:] + dow[:i]).index(1))
    for i in range(7):
        if freq[i] == 1:
            supplier_lt = int(slt[i])
            warehouse_lt = int(wlt[(i + supplier_lt) % 7])
            waiting = int(remaining[(i + supplier_lt + warehouse_lt) % 7])
            temp_lt[i] = supplier_lt + warehouse_lt + waiting
    for i in range(7):
        if temp_lt[i] == 0:
            temp_lt[i] = next((value for index, value in enumerate(temp_lt[i:] + temp_lt[:i]) if value), None)
    return ''.join(str(x) for x in temp_lt)

下面的例子;

leadtime_crossdock_calc([0,2,0,2,0,3,0],[1,1,1,1,1,1,1],[0,0,1,0,1,0,1],[0,1,0,1,0,1,0])

'3333443'

问题是,我有一个如下所示的 spark 数据框;

Product  Store  slt               wlt            dow              freq
A         B     [0,2,0,2,0,3,0]  [1,1,1,1,1,1,1] [0,0,1,0,1,0,1]  [0,1,0,1,0,1,0]

我想使用上述方法为数据框中的每个新行创建一个新列;

Product  Store  slt               wlt            dow              freq              result
A         B     [0,2,0,2,0,3,0]  [1,1,1,1,1,1,1] [0,0,1,0,1,0,1]  [0,1,0,1,0,1,0]   [3,3,3,3,4,4,3]

你能帮我解决这个问题吗?我无法将方法应用于 spark 数据帧。

【问题讨论】:

    标签: python function dataframe pyspark apache-spark-sql


    【解决方案1】:

    您可以使用User Defined Functions or UDF 首先在 spark 上注册你的 UDF,指定返回函数类型。你可以使用这样的东西:

    from pyspark.sql.types import StringType, col
    leadtime_udf = spark.udf.register("leadtime_udf", leadtime_crossdock_calc, StringType())
    

    然后,您可以将该 UDF 应用于您的 DataFrame(或在 Spark SQL 中)

    df.select("*", leadtime_udf(col(slt), ... , col(freq)))
    

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 2020-04-21
      • 1970-01-01
      • 1970-01-01
      • 2018-02-25
      • 2019-06-18
      • 1970-01-01
      • 2022-01-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多