【发布时间】:2015-05-26 13:01:17
【问题描述】:
我正在使用 Spark + Yarn,并且我有一个要在分布式节点上调用的服务。
当我在使用 java 序列化的 Junit 测试中“手动”序列化此服务对象时,服务的所有内部集合都被很好地序列化和反序列化:
@Test
public void testSerialization() {
try (
ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml");
FileOutputStream fileOutputStream = new FileOutputStream("myService.ser");
ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream);
) {
final MyService service = (MyService) contextBusiness.getBean("myServiceImpl");
objectOutputStream.writeObject(service);
objectOutputStream.flush();
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
@Test
public void testDeSerialization() throws ClassNotFoundException {
try (
FileInputStream fileInputStream = new FileInputStream("myService.ser");
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
) {
final MyService myService = (MyService) objectInputStream.readObject();
// HERE a functionnal test who proves the service has been fully serialized and deserialized .
} catch (final java.io.IOException e) {
logger.error(e.getMessage(), e);
}
}
但是当我尝试通过我的 Spark 启动器调用此服务时,无论我是否广播服务对象,一些内部集合(HashMap)都会消失(未序列化),就像它被标记为“瞬态”一样(但它是不是瞬态的,也不是静态的):
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = service.evaluate(input);
return (new OutputObject(output));
}
});
如果我广播服务,结果相同:
final Broadcast<MyService> broadcastedService = sprkCtx.broadcast(service);
JavaRDD<InputOjbect> listeInputsRDD = sprkCtx.parallelize(listeInputs, 10);
JavaRDD<OutputObject> listeOutputsRDD = listeInputsRDD.map(new Function<InputOjbect, OutputObject>() {
private static final long serialVersionUID = 1L;
public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception
MyOutput output = broadcastedService.getValue().evaluate(input);
return (new OutputObject(output));
}
});
如果我在本地模式而不是纱线集群模式下启动相同的 Spark 代码,它会完美运行。
所以我的问题是:Spark 序列化和 Java 序列化有什么区别? (我没有使用 Kryo 或任何自定义序列化)。
编辑:当我尝试使用 Kryo 序列化程序(没有显式注册任何类)时,我遇到了同样的问题。
【问题讨论】:
-
没有“Spark 序列化”之类的东西。您还没有显示关键细节,即这个 HashMap 字段。我认为它是某个地方的静态成员,实际上根本没有序列化。
-
对不起,如果它根本不可序列化,如何在第一个示例中序列化?
-
怎么可能序列化?这是关键问题
-
对不起,我不明白你的意思。它是一个 HashMap
,其 value 是可序列化值的层次结构(我不能再展示它是一个复杂的对象)。它是可序列化的证明是在第一个例子中,你理解这个例子还是你需要精度?我的观点是 Spark 对序列化进行了一些额外的过滤,而不仅仅是 objectOutputStream/objectIutputStream 序列化 -
这段代码中没有HashMap
标签: java serialization apache-spark