【问题标题】:Apply sklearn trained model on a dataframe with PySpark使用 PySpark 在数据帧上应用 sklearn 训练模型
【发布时间】:2017-05-31 13:14:50
【问题描述】:

我用 Python 训练了一个随机森林算法,并想用 PySpark 将它应用到一个大数据集上。

我首先加载了经过训练的 sklearn RF 模型(使用 joblib),将包含特征的数据加载到 Spark 数据帧中,然后我添加了一个包含预测的列,其中包含如下用户定义的函数:

def predictClass(features):
    return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction', 
udfFunction('features'))

运行需要这么多时间,有没有更有效的方法来做同样的事情? (不使用 Spark ML)

【问题讨论】:

  • 如果我用 Pandas DataFrame 做同样的事情,它会在 2 秒内完成,比如 predictions = rf.predict(data['features'])
  • 是 scikit-learn 的 RF 还是 Spark 的 MLLib RF?
  • 这是一个 scikit-learn 射频

标签: python apache-spark scikit-learn pyspark


【解决方案1】:

在最近的项目中我不得不做同样的事情。为 pyspark 每次必须读取 sklearn 模型的每一行应用 udf 的坏事,这就是为什么需要很长时间才能完成的原因。我发现的最佳解决方案是在 rdd 上使用 .mapPartitions 或 foreachPartition 方法,这里有很好的解释

https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md

它工作得很快,因为它确保你没有洗牌,并且对于每个分区,pyspark 必须读取模型并只预测一次。所以,流程是:

  • 将 DF 转换为 RDD
  • 将模型广播到节点,以便工作人员可以访问它
  • 编写一个 udf 函数,它将 interator(包含分区内的所有行)作为参数
  • 遍历行并使用您的特征创建一个适当的矩阵(顺序很重要)
  • 只调用 .predict 一次
  • 返回预测
  • 如果需要,将 rdd 转换为 df

【讨论】:

  • 你好,@Jacek 你能举个例子吗?
  • 回报应该如何?
【解决方案2】:

sklearn RF 模型在腌制时可能会很大。任务分派期间模型的频繁酸洗/取消酸洗可能会导致问题。您可以考虑使用广播变量。

来自official document

广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。 Spark 还尝试使用高效的广播算法分发广播变量以降低通信成本。

【讨论】:

    【解决方案3】:

    现在你也可以使用 spark 2.3 中引入的pandas_udf 来实现高处理速度和分布式计算。它基于pyarrow Apache Arrow 用于内存计算的python 实现。

    【讨论】:

    猜你喜欢
    • 2021-05-13
    • 2016-10-19
    • 2018-05-01
    • 2015-06-17
    • 2018-10-23
    • 2018-10-19
    • 2018-10-27
    • 2018-09-22
    • 2017-08-19
    相关资源
    最近更新 更多