【发布时间】:2020-09-27 07:26:07
【问题描述】:
我有一个响应 API 调用的 Java 客户端类(用作 spark-shell 的依赖 Jar) - 让我们调用类 SomeAPIRequester。
在纯 Java 中,它会使用以下示例代码返回我想要的结果 -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
我将我的 UDF 定义为 -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
并将此 UDF 称为 -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
当我在结果上运行.show() 或.take() 时,我在请求者java 类上得到NullPointerException。
我还尝试发送文字 (lit),并在 scala 中阅读了有关 typedLit 的信息,但我无法将 Java Requester 类转换为 scala 中任何允许的 typedLit 类型。
有没有办法通过 UDF 调用这个 Java 类对象并从 API 中获取结果?
编辑:
我还尝试在 RDD 的 foreach 块中初始化请求者类 -
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
但这不会返回任何响应 - 无法初始化类等。
谢谢!
【问题讨论】:
-
您可能需要发布代码的整个结构。很难猜测在哪里声明了什么以及何时使用什么。此外,您正在使用带有 RDD 的 UDF,这很奇怪。
-
无论哪种方式,
df.withColumn("newCol", udf(lit(x), requester))也会引发与我的rdd.foreach{x => udf(x, requester)}类似的错误......对于我定义的 UDF。用更多细节更新我的问题..
标签: java scala apache-spark user-defined-functions