【问题标题】:What is the difference between Spark Serialization and Java Serialization?Spark序列化和Java序列化有什么区别?
【发布时间】:2015-05-26 13:01:17
【问题描述】:

我正在使用 Spark + Yarn,并且我有一个要在分布式节点上调用的服务。

当我在使用 java 序列化的 Junit 测试中“手动”序列化此服务对象时,服务的所有内部集合都被很好地序列化和反序列化:

  @Test
  public void testSerialization() {  

    try (
        ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml");
        FileOutputStream fileOutputStream = new FileOutputStream("myService.ser");
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
        ) {

      final MyService service = (MyService) contextBusiness.getBean("myServiceImpl");

      objectOutputStream.writeObject(service);
      objectOutputStream.flush();

    } catch (final java.io.IOException e) {
      logger.error(e.getMessage(), e);
    }
  }

  @Test
  public void testDeSerialization() throws ClassNotFoundException {  

    try (
        FileInputStream fileInputStream = new FileInputStream("myService.ser");
        ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
        ) {

      final MyService myService = (MyService) objectInputStream.readObject();

      // HERE a functionnal test who proves the service has been fully serialized and deserialized      .

    } catch (final java.io.IOException e) {
      logger.error(e.getMessage(), e);
    }
  }  

但是当我尝试通过我的 Spark 启动器调用此服务时,无论我是否广播服务对象,一些内部集合(HashMap)都会消失(未序列化),就像它被标记为“瞬态”一样(但它是不是瞬态的,也不是静态的):

JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new   Function<InputOjbect, OutputObject>() {
  private static final long serialVersionUID = 1L;

  public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception

    MyOutput output = service.evaluate(input);
    return (new OutputObject(output));
  }
});

如果我广播服务,结果相同:

final Broadcast<MyService> broadcastedService = sprkCtx.broadcast(service);      
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new   Function<InputOjbect, OutputObject>() {
  private static final long serialVersionUID = 1L;

  public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception

    MyOutput output = broadcastedService.getValue().evaluate(input);
    return (new OutputObject(output));
  }
});

如果我在本地模式而不是纱线集群模式下启动相同的 Spark 代码,它会完美运行。

所以我的问题是:Spark 序列化和 Java 序列化有什么区别? (我没有使用 Kryo 或任何自定义序列化)。

编辑:当我尝试使用 Kryo 序列化程序(没有显式注册任何类)时,我遇到了同样的问题。

【问题讨论】:

  • 没有“Spark 序列化”之类的东西。您还没有显示关键细节,即这个 HashMap 字段。我认为它是某个地方的静态成员,实际上根本没有序列化。
  • 对不起,如果它根本不可序列化,如何在第一个示例中序列化?
  • 怎么可能序列化?这是关键问题
  • 对不起,我不明白你的意思。它是一个 HashMap ,其 value 是可序列化值的层次结构(我不能再展示它是一个复杂的对象)。它是可序列化的证明是在第一个例子中,你理解这个例子还是你需要精度?我的观点是 Spark 对序列化进行了一些额外的过滤,而不仅仅是 objectOutputStream/objectIutputStream 序列化
  • 这段代码中没有HashMap

标签: java serialization apache-spark


【解决方案1】:

好的,感谢我们的一位实验数据分析师。

那么,这个谜团是关于什么的?

  • 这与序列化无关(java 或 Kryo)
  • 这与 Spark 在序列化之前/之后所做的某些预处理或后处理无关
  • 这与完全可序列化的 HashMap 字段无关(如果您阅读了我给出的第一个示例,这一点很明显,但并不适合所有人;)

所以...

整个问题是关于这个的:

"如果我在本地模式而不是纱线集群中启动相同的 Spark 代码 模式,完美运行。”

在“yarn cluster”模式下,集合无法初始化,因为它是在随机节点上启动的,无法访问磁盘上的初始参考数据。在本地模式下,当在磁盘上找不到初始数据时出现明显异常,但在集群模式下它完全静默,看起来问题与序列化有关。

使用“yarn client”模式为我们解决了这个问题。

【讨论】:

    猜你喜欢
    • 2013-12-30
    • 2019-07-21
    • 2010-10-20
    • 2019-06-14
    • 2013-05-21
    • 2013-05-19
    • 1970-01-01
    • 1970-01-01
    • 2018-11-10
    相关资源
    最近更新 更多