【问题标题】:Calling .agg on udf function throws error在 udf 函数上调用 .agg 会引发错误
【发布时间】:2019-10-08 03:31:48
【问题描述】:

我正在尝试对已生成的设置 ​​bin 应用 LinearRegression。包含 bin 的 DataFrame 目前看起来像 DataFrame[features: vector, trip_duration: int, prediction: double]。 bin 被标记为预测。目前,我的代码如下所示

    predictions = crossval.fit(trainingData).transform(trainingData)
    ''' 
    DataFrame[features: vector, trip_duration: int, prediction: double]
    '''
    transform_udf = udf(lambda x: vecAssembler.transform(x))
    bins = predictions.groupBy("prediction").agg(transform_udf(predictions.features)).show()


但是,当我运行此代码时,我收到以下错误:

Traceback (most recent call last):
  File "/opt/spark/python/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/spark/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
Py4JError: An error occurred while calling o163.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


Traceback (most recent call last):
  File "part2.py", line 118, in <module>
    main()
  File "part2.py", line 106, in main
    bins = predictions.groupBy("prediction").agg(transform_udf(predictions.features)).show()
  File "/opt/spark/python/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/opt/spark/python/pyspark/sql/udf.py", line 167, in __call__
    judf = self._judf
  File "/opt/spark/python/pyspark/sql/udf.py", line 151, in _judf
    self._judf_placeholder = self._create_judf()
  File "/opt/spark/python/pyspark/sql/udf.py", line 160, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/opt/spark/python/pyspark/sql/udf.py", line 35, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/spark/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/spark/python/pyspark/serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)
cPickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o163.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

如何将线性回归模型应用于具有特定预测的数据?请注意,我正在尝试对根据预测分组的所有数据应用线性回归模型。所以我想运行 lrm:

[row 6 - prediction 1,
row 4 - prediction 1,
row 8 - prediction 1]

[row 2 - prediction 2,
row 5 - prediction 2,
row 1 - prediction 2,
row 7 - prediction 2]

[row 3 - prediction 3]

不使用熊猫。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    方便地,对于表单的线性回归,

    在标准普通最小二乘假设下,估计的参数具有如下解析解。

    X 是你的特征,y 是你的标签,下标 T 和 -1 分别是矩阵转置和逆矩阵。

    您可以编写一个pandas_udf 来使用上面的公式计算您的线性回归参数,并在groupBy 之后应用它。请注意,您现在使用的标准 udf 不适用于 groupBy

    【讨论】:

    • 很遗憾我不能使用 pandas。
    • @wookieluvr13 'from pyspark.sql.functions import pandas_udf, PandasUDFType' 对你不可用,是吗?如果可用,我建议阅读spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html 中的分组映射,以实现分组,然后进行线性回归。如果不是很不幸,您将不得不使用循环对线性回归进行硬编码,例如从标签 1 到最后一个的 .filter(col('prediction')==1) ,然后逐个进行转换和拟合。我希望它有所帮助。
    猜你喜欢
    • 2018-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-15
    • 1970-01-01
    • 2013-07-23
    • 1970-01-01
    相关资源
    最近更新 更多