【问题标题】:Spark - Serializing an object with a non-serializable memberSpark - 使用不可序列化成员序列化对象
【发布时间】:2018-01-21 19:53:23
【问题描述】:

我将在 Spark 的上下文中提出这个问题,因为这就是我面临的问题,但这可能是一个普通的 Java 问题。

在我们的 spark 工作中,我们有一个 Resolver 需要在我们所有的工作人员中使用(它在 udf 中使用)。问题是它不可序列化,我们无法将其更改为可序列化。解决方案是将其作为另一个可序列化的类的成员

所以我们最终得到:

public class Analyzer implements Serializable {
    transient Resolver resolver;

    public Analyzer() {
        System.out.println("Initializing a Resolver...");
        resolver = new Resolver();
    }

    public int resolve(String key) {
         return resolver.find(key);
    }
}

然后我们使用 Spark API broadcast 这个类:

 val analyzer = sparkContext.broadcast(new Analyzer())

(更多关于 Spark 广播的信息可以找到here

然后我们继续在 UDF 中使用 analyzer,作为 Spark 代码的一部分,例如:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()

这一切都按预期工作,但让我们感到疑惑。

Resolver 没有实现Serializable,因此被标记为transient——这意味着它不会与它的所有者对象Analyzer 一起被序列化。

但是从上面的代码可以清楚地看到,resolve() 方法使用的是resolver,所以它一定不能为空。事实并非如此。代码有效。

那么如果字段没有通过序列化传递,resolver成员是如何实例化的呢?

我最初的想法是Analyzer 构造函数可能在接收端(即火花工作者)被调用,但我希望看到"Initializing a Resolver..." 行打印多次。但它只打印一次,这可能表明它只被调用一次,就在它传递给广播 API 之前。那么为什么resolver 不为空呢?

我是否缺少有关 JVM 序列化或 Spark 序列化的内容?

这段代码是如何工作的?

Spark 在 YARN 上以cluster 模式运行。 spark.serializer 设置为 org.apache.spark.serializer.KryoSerializer

【问题讨论】:

  • 我没有完整的答案,但看起来这是由于一些 Kryo 内部的事情而发生的(广播变量使用 Kryo 序列化,如果您要通过闭包传递变量或使用 org.apache.spark.serializer.JavaSerializer正如预期的那样,它将为空)。
  • by the docs spark.apache.org/docs/latest/tuning.html 如果您没有在 spark 上下文中指定序列化,那么您正在使用默认的 java 序列化...而 kryo 只是忽略序列化声明或瞬态字段(您可以指示它避免尽管juanrh.github.io/doc/kryo/apidocs/com/esotericsoftware/kryo/…)是短暂的
  • @Zeromus 我觉得你的评论令人费解,因为:a) 我们特别声明了 kryo。 b) 如果我们删除 transient 它会失败,并且堆栈跟踪会导致 kryo
  • @summerbulb 我的错,我错过了问题的最后两行......
  • @summerbulb 你能添加不带瞬态属性的 Kryo 堆栈跟踪吗?

标签: java scala apache-spark serialization kryo


【解决方案1】:

那么如果该字段没有通过序列化传递,如何 解析器成员实例化了吗?

在调用kryo.readObject时通过构造函数调用(new Resolver)实例化:

kryo.readClassAndObject(input).asInstanceOf[T]

我最初的想法是可能调用了 Analyzer 构造函数 在接收方(即火花工作者),但我希望 看到“Initializing a Resolver...”行打印了几次。 但它只打印了一次,这可能表明 事实上它只被调用一次

广播变量不是这样工作的。发生的情况是,当每个 Executor 需要范围内的广播变量时​​,它首先检查其内存中的 BlockManager 中是否有对象,如果没有,它会询问驱动程序或邻居执行程序(如果有多个同一 Worker 节点上的执行程序)为其缓存实例,然后将其序列化并将其发送给他,然后他接收该实例并将其缓存在他自己的 BlockManager 中。

这记录在TorrentBroadcast(这是默认的广播实现)的行为中:

* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).

如果我们删除瞬态,它会失败,堆栈跟踪会导致 Kryo

那是因为Resolver 类中可能有一个字段,即使是 Kryo 也无法序列化,无论 Serializable 属性如何。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-10
    • 2016-06-19
    • 2015-07-20
    • 1970-01-01
    • 2018-08-01
    • 2020-02-06
    相关资源
    最近更新 更多