【问题标题】:Task had a not serializable result in spark任务在火花中有一个不可序列化的结果
【发布时间】:2024-01-16 14:19:01
【问题描述】:

我正在尝试使用 cassandra 驱动程序读取 cassandra 表。 这是代码。

val x = 1 to 2
val rdd = sc.parallelize(x)

val query = "Select data from testkeyspace.testtable where id=%d"

val cc = CassandraConnector(sc.getConf)

val res1 =
    rdd.map{ it => 
            cc.withSessionDo{ session =>
            session.execute( query.format(it))
        }
     }

res1.take(1).foreach(println)

但我收到异常 Task had a not serializable result。

  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 24.0 (TID 77) had a not serializable result: com.datastax.driver.core.ArrayBackedResultSet$SinglePage
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如何解决这个问题?

【问题讨论】:

    标签: apache-spark spark-cassandra-connector


    【解决方案1】:

    我们转换中的不可序列化对象是从 Cassandra 返回的结果,它是查询结果的可迭代对象。 您通常希望将该集合具体化到 RDD 中。

    一种方法是询问该查询产生的所有记录:

    session.execute( query.format(it)).all()
    

    【讨论】:

    • 即使我做了所有我得到同样的例外。 “任务有一个不可序列化的结果”
    • @Knight71 有同样的原因吗?
    • 这次使用不同的数据类型。 org.apache.spark.scheduler.TaskSetManager: Task 1.0 in stage 1.0 (TID 2) had a not serializable result: com.datastax.driver.core.ArrayBackedRow; not retrying .org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 1.0 (TID 2) had a not serializable result: com.datastax.driver.core.ArrayBackedRow
    • @Knight71 然后跟随面包屑:https://github.com/datastax/java-driver/blob/2.1/driver-core/src/main/java/com/datastax/driver/core/ArrayBackedRow.java 不可序列化。这个想法是map操作创建的对象需要是可序列化的。您需要将 Row 转换为某种可序列化的形式:例如session.execute(query.format(it)).one().getXXXXX("data")(不确定“数据”是什么)
    • 谢谢。我会试试的。
    最近更新 更多