【问题标题】:PySpark create combinations using UDFPySpark 使用 UDF 创建组合
【发布时间】:2018-04-13 07:40:54
【问题描述】:

这可能是一个基本问题,但我已经被困了一段时间了。

我的列名很少,我正在尝试创建一个组合列表,将 Spark 中的两个元素组合在一起。这是我尝试创建组合的列表

numeric_cols = ["age", "hours-per-week", "fnlwgt"]

我正在使用来自itertools 模块的combinations

from itertools import combinations
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def combinations2(x): return combinations(x,2)
udf_combinations2 = udf(combinations2,ArrayType())

但是在运行时

pairs  = udf_combinations2(numeric_cols)

我收到以下错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/sg/Downloads/spark/python/pyspark/sql/udf.py", line 179, in wrapper
    return self(*args)
  File "/Users/sg/Downloads/spark/python/pyspark/sql/udf.py", line 159, in __call__
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
  File "/Users/sg/Downloads/spark/python/pyspark/sql/column.py", line 66, in _to_seq
    cols = [converter(c) for c in cols]
  File "/Users/sg/Downloads/spark/python/pyspark/sql/column.py", line 66, in <listcomp>
    cols = [converter(c) for c in cols]
  File "/Users/sg/Downloads/spark/python/pyspark/sql/column.py", line 54, in _to_java_column
    "function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: ['age', 'hours-per-week', 'fnlwgt'] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

对于这种情况,我不确定如何使用最后一行中提到的功能。任何方向和提示都会很棒。

谢谢

【问题讨论】:

  • 您需要为此使用UDF 吗?为什么不直接运行pairs = combinations2(numeric_cols)
  • 我想我必须这样做。我的数据集有数千列,具体取决于它们的活动。 python combinations 可能需要很长时间。

标签: apache-spark pyspark spark-dataframe user-defined-functions


【解决方案1】:

首先正确定义udf

df = spark.createDataFrame([(1, 2 ,3)], ("age", "hours-per-week", "fnlwgt"))

你可以用单个参数来定义它

@udf("array<struct<_1: double, _2: double>>")
def combinations_list(x):
   return combinations(x, 2)

或可变参数

@udf("array<struct<_1: double, _2: double>>")
def combinations_varargs(*x):
   return combinations(list(x), 2)

在这两种情况下您都为输出数组声明类型。这里我们将使用doublestructs

确保输入类型与声明的输出类型匹配:

from pyspark.sql.functions import col

numeric_cols = [
    col(c).cast("double") for c in ["age", "hours-per-week", "fnlwgt"]
]

要调用单个参数版本,请使用array

from pyspark.sql.functions import array

df.select(
     combinations_list(array(*numeric_cols)).alias("combinations")
).show(truncate=False)
# +---------------------------------+
# |combinations                     |
# +---------------------------------+
# |[[1.0,2.0], [1.0,3.0], [2.0,3.0]]|
# +---------------------------------+

调用 varargs 变体解包值

df.select(
     combinations_varargs(*numeric_cols).alias("combinations")
).show(truncate=False)
# +---------------------------------+
# |combinations                     |
# +---------------------------------+
# |[[1.0,2.0], [1.0,3.0], [2.0,3.0]]|
# +---------------------------------+

【讨论】:

  • 你也可以用 numpy 做类似的事情。在我的机器上 4000 列大约需要 6 秒。
猜你喜欢
  • 2018-08-27
  • 1970-01-01
  • 1970-01-01
  • 2022-11-15
  • 2016-11-24
  • 2021-06-11
  • 1970-01-01
  • 1970-01-01
  • 2023-03-25
相关资源
最近更新 更多