【发布时间】:2020-12-23 19:50:51
【问题描述】:
我一直在寻找是否有任何方法可以在 Pyspark 中使用 Scala 函数,但我没有找到任何有关此主题的文档或指南。
我的目标是使用之前人们定义的scala函数appendPrvlngFields隐式函数。然后我想在python环境中使用这个函数而不重新定义它,但是通过一些类型的方式,比如注册scala函数
假设我在 Scala 中创建了一个使用用户定义库的简单对象,例如:
%scala
package com.Example
import org.library.DataFrameImplicits._
import org.apache.spark.sql.DataFrame
object ScalaFunction {
def appendPrvlngFields(df: DataFrame,
otherDF: DataFrame,
colsToAppend: List[(String)] = List[(String)](),
mapColName: Option[String] = None,
partitionByCols: List[(String)],
sort: List[(String)],
sortBfirst: Boolean = false,
subsequent: Boolean = false,
existingPartitionsOnly: Boolean = false,
otherDFPrefix: String = "prvlg",
enforceLowerCase: Boolean = false
): DataFrame = {
return df.appendPrvlngFields(otherDF,
colsToAppend,
mapColName,
partitionByCols,
sort,
sortBfirst,
subsequent,
existingPartitionsOnly,
otherDFPrefix,
enforceLowerCase
)
}
}
然后在python环境中,我通过定义这个函数来调用函数appendPrvlngFields:
def pyAppendPrvlngFields(df: DataFrame,
otherDF: DataFrame,
colsToAppend: list,
partitionByCols: list,
sort: list,
mapColName = None,
sortBfirst = False,
subsequent = False,
existingPartitionsOnly = False,
otherDFPrefix = "prvlg",
enforceLowerCase = False) -> DataFrame:
return(DataFrame(sc._jvm.com.SRMG.ScalaPySpark.appendPrvlngFields(df._jdf,
otherDF._jdf,
colsToAppend,
mapColName,
partitionByCols,
sort,sortBfirst,
subsequent),
sqlContext))
我知道我需要将 df 转换为 df._jdf,但是如何将列表、字符串、Option、Boolean 转换为 java 类型?
【问题讨论】:
-
我认为与
_jdf之类的东西混在一起可能不合适 - 请参阅此链接以了解您的用例:spark.apache.org/docs/latest/api/python/… -
这能回答你的问题吗? How to call scala from python?
-
no... 我想我现在缺少的一步是如何将列表、字符串、选项、布尔值转换为 java 类型,就像我将 df 转换为 df._jdf 一样跨度>