【问题标题】:org.apache.spark.SparkException: Task not serializable (scala)org.apache.spark.SparkException:任务不可序列化(scala)
【发布时间】:2015-12-30 10:43:59
【问题描述】:

我是 scala 和 spark 的新手,请帮我解决这个问题。 在 spark shell 中,当我单独加载以下函数时,它们会毫无例外地运行,当我在 scala 对象中复制此函数并在 spark shell 中加载相同的文件时,它们会抛出 task not serialization尝试并行化时“processbatch”函数中的异常。 相同的PFB代码:

import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.spark.sql.hive.HiveContext

object Process {
    val hc = new HiveContext(sc)

    def processsingle(wait: Int, patient: org.apache.spark.sql.Row, visits: Array[org.apache.spark.sql.Row]) : String = {
        var out = new StringBuilder()
        val processStart = getTimeInMillis()
        for( x <- visits ) { 
            out.append(", " + x.getAs("patientid") + ":" + x.getAs("visitid")) 
        }
    }

    def processbatch(batch: Int, wait: Int, patients: Array[org.apache.spark.sql.Row], visits: Array[org.apache.spark.sql.Row]) = {
        val out = sc.parallelize(patients, batch).map( r=> processsingle(wait, r, visits.filter(f=> f.getAs("patientid") == r.getAs("patientid")))).collect()
        for(x <- out) println(x)
    }

    def processmeasures(fetch: Int, batch: Int, wait: Int) = {

        val patients = hc.sql("SELECT patientid FROM tableName1 order by p_id").collect()
        val visit = hc.sql("SELECT patientid, visitid FROM tableName2")
        val count = patients.length
        val fetches = if(count % fetch > 0) (count / fetch + 1) else (count / fetch)


        for(i <- 0 to fetches.toInt-1){ 
            val startFetch = i*fetch
            val endFetch = math.min((i+1)*fetch, count.toInt)-1
            val fetchSize = endFetch - startFetch + 1
            val fetchClause = "patientid >= " + patients(startFetch).get(0) + " and patientid <= " + patients(endFetch).get(0)
            val fetchVisit = visit.filter( fetchClause ).collect()

            val batches = if(fetchSize % batch > 0) (fetchSize / batch + 1) else (fetchSize / batch)
            for(j <- 0 to batches.toInt-1){ 
                val startBatch = j*batch
                val endBatch = math.min((j+1)*batch, fetch.toInt)-1

                println(s"Batch from $startBatch to $endBatch");
                val batchVisits = fetchVisit.filter(g => g.getAs[Long]("patientid") >= patients(i*fetch + startBatch).getLong(0) && g.getAs[Long]("patientid") <= patients(math.min(i*fetch + endBatch + 1, endFetch)).getLong(0))
                processbatch(batch, wait, patients.slice(i*fetch + startBatch, i*fetch + endBatch + 1), batchVisits)
            }
        }
        println("Processing took " + getExecutionTime(processStart) + " millis")
    }

}

【问题讨论】:

  • 谢谢,它解决了问题,但是是否建议在每个地方都扩展 Serializable
  • 如果你想将东西传递给动作/转换,那么包装类必须是可序列化的。说实话,您的代码中存在多个问题,序列化是最不重要的。
  • 感谢您的快速回复,因为我写的这些东西对我来说是很新的东西,请您提及这些问题,以便我纠正。
  • 它可能更适合Code Review,但对于初学者来说,这些重复的collect / parallelize 调用是完全错误的。它的效率极低,而且没有任何作用。我也会避免使用传递给转换的函数来保留上下文。

标签: scala serialization apache-spark


【解决方案1】:

你应该使Process对象Serializable

object Process extends Serializable {
  ...
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-04-02
    • 2015-05-31
    • 2016-07-27
    • 2015-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-07
    相关资源
    最近更新 更多