【发布时间】:2018-01-21 19:53:23
【问题描述】:
我将在 Spark 的上下文中提出这个问题,因为这就是我面临的问题,但这可能是一个普通的 Java 问题。
在我们的 spark 工作中,我们有一个 Resolver 需要在我们所有的工作人员中使用(它在 udf 中使用)。问题是它不可序列化,我们无法将其更改为可序列化。解决方案是将其作为另一个可序列化的类的成员。
所以我们最终得到:
public class Analyzer implements Serializable {
transient Resolver resolver;
public Analyzer() {
System.out.println("Initializing a Resolver...");
resolver = new Resolver();
}
public int resolve(String key) {
return resolver.find(key);
}
}
然后我们使用 Spark API broadcast 这个类:
val analyzer = sparkContext.broadcast(new Analyzer())
(更多关于 Spark 广播的信息可以找到here)
然后我们继续在 UDF 中使用 analyzer,作为 Spark 代码的一部分,例如:
val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
这一切都按预期工作,但让我们感到疑惑。
Resolver 没有实现Serializable,因此被标记为transient——这意味着它不会与它的所有者对象Analyzer 一起被序列化。
但是从上面的代码可以清楚地看到,resolve() 方法使用的是resolver,所以它一定不能为空。事实并非如此。代码有效。
那么如果字段没有通过序列化传递,resolver成员是如何实例化的呢?
我最初的想法是Analyzer 构造函数可能在接收端(即火花工作者)被调用,但我希望看到"Initializing a Resolver..." 行打印多次。但它只打印一次,这可能表明它只被调用一次,就在它传递给广播 API 之前。那么为什么resolver 不为空呢?
我是否缺少有关 JVM 序列化或 Spark 序列化的内容?
这段代码是如何工作的?
Spark 在 YARN 上以cluster 模式运行。
spark.serializer 设置为 org.apache.spark.serializer.KryoSerializer。
【问题讨论】:
-
我没有完整的答案,但看起来这是由于一些 Kryo 内部的事情而发生的(广播变量使用 Kryo 序列化,如果您要通过闭包传递变量或使用
org.apache.spark.serializer.JavaSerializer正如预期的那样,它将为空)。 -
by the docs spark.apache.org/docs/latest/tuning.html 如果您没有在 spark 上下文中指定序列化,那么您正在使用默认的 java 序列化...而 kryo 只是忽略序列化声明或瞬态字段(您可以指示它避免尽管juanrh.github.io/doc/kryo/apidocs/com/esotericsoftware/kryo/…)是短暂的
-
@Zeromus 我觉得你的评论令人费解,因为:a) 我们特别声明了 kryo。 b) 如果我们删除
transient它会失败,并且堆栈跟踪会导致 kryo -
@summerbulb 我的错,我错过了问题的最后两行......
-
@summerbulb 你能添加不带瞬态属性的 Kryo 堆栈跟踪吗?
标签: java scala apache-spark serialization kryo