【问题标题】:Task not serializable : Spark任务不可序列化:Spark
【发布时间】:2020-04-29 16:55:04
【问题描述】:

我的 spark 工作是在运行时抛出 Task not serializable。谁能告诉我我在这里做错了什么?

@Component("loader")
    @Slf4j
public class LoaderSpark implements SparkJob {
    private static final int MAX_VERSIONS = 1;

    private final AppProperties props;


    public LoaderSpark(
            final AppProperties props
    ) {
        this.props = props;
    }


    @Override
    public void run(SparkSession spark, final String... args) throws IOException {

        HBaseUtil hBaseUtil = new HBaseUtil(props);

        byte[][] prefixes = new byte[][]{toBytes("document"),
                toBytes("dataSource"),
                toBytes("hold:")};

        Filter filter = new MultipleColumnPrefixFilter(prefixes);

        Scan scan = new Scan();
        scan.addFamily(toBytes("data"));
        scan.setCaching(100000);
        scan.setMaxVersions(MAX_VERSIONS);
        scan.setFilter(filter);


        JavaRDD<TestMethod> mapFileJavaRDD
                = hBaseUtil.createScanRdd(spark, "TestTable", scan).mapPartitions(tuple -> {

            return StreamUtils.asStream(tuple)
                    .map(this::extractResult)
                    .filter(Objects::nonNull)
                    .iterator();

        });


        Dataset<TestMethod> testDataset = spark.createDataset(mapFileJavaRDD.rdd(), bean(TestMethod.class));
        testDataset.limit(100);

    }

    private TestMethod extractResult(Tuple2<ImmutableBytesWritable, Result> resultTuple) {


        TestMethod.TestMethodBuilder testBuilder = TestMethod.builder();
        Result result;
        result = resultTuple._2();
        CdfParser cdfParser = new CdfParser();

        List<String> holdingId = new ArrayList<>();

        testBuilder.dataSource(Bytes.toString(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("dataSource"))));
        testBuilder.document(cdfParser.fromXml(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("document"))));

        NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
        for (byte[] bQunitifer : familyMap.keySet()) {

            if (Bytes.toString(bQunitifer).contains("hold:")) {

                LOG.info(Bytes.toString(bQunitifer));
                holdingId.add(Bytes.toString(bQunitifer));

            }
        }
        testBuilder.holding(holdingId);

        return testBuilder.build();
    }

}

这里是堆栈跟踪:

  2020-04-29 12:48:59,837 INFO  [Thread-4]o.a.s.d.y.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: Failed to execute CommandLineRunner
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
        at org.oclc.googlelinks.spark.SpringSparkJob.main(SpringSparkJob.java:56)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:694)
    Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
        at org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
        at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
        at org.oclc.googlelinks.spark.job.LoaderSpark.run(LoaderSpark.java:79)
        at org.oclc.googlelinks.spark.SpringSparkJob.run(SpringSparkJob.java:79)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
        ... 8 more

【问题讨论】:

  • 实现 Serializable 用于保存对象的状态 LoaderSpark 绝对不是用于保存状态的 bean
  • 知道为什么它会因 Task not serializable 而失败。我从 LoaderSpark 中删除了 Serializable。
  • 它需要所有对象字段的 getter 和 setter 才能序列化和反序列化
  • 应该有更多错误日志,因为您可以在哪个类不可序列化中发布该日志

标签: scala apache-spark serialization


【解决方案1】:

尝试为props添加getter和setter

public void setProps(AppProperties props) {
        this.props = props;
    }

    public AppProperties getProps() {
        return props;
    }

【讨论】:

  • 我希望你保留了实现 Serializable
【解决方案2】:

只需将函数 extractResult 设为静态即可。为了调用静态方法,您不需要序列化类,您需要类加载器可以访问声明类(确实如此,因为 jar 档案可以在驱动程序和工作人员之间共享)。

感谢https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/

【讨论】:

    猜你喜欢
    • 2018-04-06
    • 2021-03-16
    • 2015-12-16
    • 2017-03-21
    • 2016-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多