【问题标题】:Pass python functions to Scala RDD in pyspark在 pyspark 中将 python 函数传递给 Scala RDD
【发布时间】:2019-11-14 10:56:09
【问题描述】:

我有一个 scala 库,它(简单地说)接收一个函数,将它应用到一个 RDD 并返回另一个 RDD

def runFunction(rdd: RDD, function: Any => Any) = {
    ....
    val res = rdd.map(function) 
    ...
}

在 scala 中的用法是

import mylibrary.runFunction
runFunction(myRdd, myScalaFun)

这个库被打包在一个 jar 中,我现在也想在 python 中使用它。我想做的是在 Python 中加载这个库并将一个 Python 函数传递给它。 Python 中的用法是:

spark._jvm.mylibrary.runFunction(myPythonRdd, myPythonFun)

这将允许我使用 python 函数以及 Scala 函数,而无需将整个库移植到 python。这是否可以通过 Spark 在 Python 和 JVM 之间来回切换的能力来实现?

【问题讨论】:

  • 在我看来,这一切很快就会变得难以维护......如果你不只是因为需要/喜欢 python 生态系统(对于 dataviz,ml ......)而使用 scala,我建议你看看Netflix's polynote,它允许您将两种语言无缝混合到一个笔记本中,并提供良好的 Spark 支持。

标签: apache-spark pyspark rdd py4j


【解决方案1】:

PySpark 中 Python 和 JVM 的通信方式有一些微妙之处。桥使用 Java 对象,即 JavaRDD 而不是 RDD,并且这些对象需要在 Scala 中显式拆箱。由于您的 Scala 函数采用 RDD,因此您需要在 Scala 中编写一个接收 JavaRDD 的包装器并首先执行拆箱:

def runFunctionWrapper(jrdd: JavaRDD, ...) = {
  runFunction(jrdd.rdd, ...)
}

然后这样称呼它

spark._jvm.mylibrary.runFunctionWrapper(myPythonRdd._jrdd, ...)

请注意,根据 Python 约定,_jrdd 被视为 Python RDD 类的私有成员,因此这实际上依赖于未记录的实现细节。这同样适用于SparkContext_jvm 成员。

真正的问题是让 Scala 回调到 Python 中以应用 function。在 PySpark 中,Python RDD 的 map() 方法创建了一个 org.apache.spark.api.python .PythonFunction 的实例,该实例包含对 Python 映射器函数及其环境的腌制引用。然后每个 RDD 分区都被序列化,并与通过 TCP 发送的腌制内容一起发送到与 Spark 执行器位于同一位置的 Python 进程,在其中对分区进行反序列化和迭代。最后,结果再次被序列化并发送回执行器。整个过程由org.apache.spark.api.python.PythonRunner 的实例编排。这与围绕 Python 函数构建包装器并将其传递给 RDD 实例的 map() 方法非常不同。

我相信最好在 Python 中简单地复制 runFunction 的功能,或者(在性能方面更好)在 Scala 中复制 myPythonFun 的功能。或者,如果您可以以交互方式完成操作,请遵循 @EnzoBnl 的建议,并使用 Zeppelin 或 Polynote 等多语言笔记本环境。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-09-11
    • 1970-01-01
    • 2017-08-01
    • 2020-03-15
    • 2019-06-19
    • 2015-07-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多