【问题标题】:Resolve SparkException: Task not serializable when importing PMML model解决 SparkException:导入 PMML 模型时任务不可序列化
【发布时间】:2016-05-29 08:21:52
【问题描述】:

我想导入一个 PMML 模型,以使用 Spark 计算分数。当我不使用 spark 时,一切正常,但我无法在映射器中使用我的方法。

问题是我需要一个来自 org.jpmml.evaluator.Evaluator 的评估对象,它似乎不可序列化。因此,我尝试使用以下类使其成为 Serialiazable:

package util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.jpmml.evaluator.Evaluator;

public class SerializableEvaluator implements Serializable {

    private static final long serialVersionUID = 6631604036553063657L;
    private Evaluator evaluator;

    public SerializableEvaluator(Evaluator evaluator) {
        this.evaluator = evaluator;
    }

    public Evaluator getEvaluator() {
        return evaluator;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(evaluator);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Evaluator eval = (Evaluator) in.readObject();
    }
}

我还使我的所有类都可序列化。

这是我的代码示例:

        logger.info("Print 5 first rows----------------------------");
        strTitanicRDD
                .take(5)
                .forEach(row -> logger.info(row));
        logger.info("Print 5 first Titatnic Obs---------------------");
        strTitanicRDD
                .map(row -> new TitanicObservation(row))
                .take(5)
                .forEach(titanic -> logger.info(titanic.toString()));
        logger.info("Print 5 first Scored Titatnic Obs---------------");

        try{strTitanicRDD
            .map(row -> new TitanicObservation(row))
            .map(
                new Function<TitanicObservation,String>(){

                    private static final long serialVersionUID = -2968122030659306400L;

                    @Override
                    public String call(TitanicObservation titanic) throws Exception {
                        String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
                        return res;
                    }

                })
        .take(5)
        .forEach(row -> logger.info(row));

但我不认为我的代码会帮助你解决我的问题,这很清楚(见日志:)

org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 在 org.apache.spark.rdd.RDD.map(RDD.scala:286) 在 org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89) 在 org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) 在 score.acv.AppWithSpark.main(AppWithSpark.java:117) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:497) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

引起:java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl 序列化栈:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more

【问题讨论】:

    标签: java serialization apache-spark pmml


    【解决方案1】:

    org.jpmml.evaluator.Evaluator 接口后面有一个org.jpmml.evaluator.ModelEvaluator 子类的实例。 ModelEvaluator 类及其所有子类在设计上都是可序列化的。问题与您在开始时提供给ModelEvaluatorFactory#newModelManager(PMML) 方法的org.dmg.pmml.PMML 对象实例有关。

    简而言之,每个 PMML 类模型对象都可以附加 SAX 定位器信息。这在开发和测试阶段用于定位有问题的 XML 内容非常有用。但是,在生产阶段,不应再保留此信息。您可以通过正确配置 JAXB 运行时禁用 SAX 定位器信息,或者通过使用 null 参数调用 PMMLObject#setLocator(Locatable) 来简单地清除现有的 SAX 定位器实例。后一种功能由org.jpmml.model.visitors.LocatorNullifierVisitor 类形式化。

    完整示例请参见官方JPMML-Spark projectorg.jpmml.spark.EvaluatorUtil 实用程序类(尤其是第73 到75 行附近)。为什么不首先使用 JPMML-Spark?

    【讨论】:

    • 非常感谢您的帮助。我没有使用 Jpmml-spark 因为它需要 Spark 1.5,我的应用程序可能需要使用 Spark 1.3.1。当我成功使用 JPMML 计算分数时,我认为将我的函数放入映射器会很简单。我对您的体验非常感兴趣,您是否将 PMML 置于生产环境中?根据您的说法,spark-JPMML 是一个可行的替代方案吗?
    猜你喜欢
    • 2018-09-08
    • 2020-07-01
    • 2020-09-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-22
    • 2021-09-17
    相关资源
    最近更新 更多