【问题标题】:Read from Spark RDD a Kryo File从 Spark RDD 读取 Kryo 文件
【发布时间】:2014-05-12 20:01:07
【问题描述】:

我是 Spark 和 Scala 新手。

我需要读取和分析 Spark 中的一个文件,该文件是用我的 scala 代码编写的,并使用 Kryo 序列化:

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Output

val kryo:Kryo = new Kryo()
val output:Output = new Output(new FileOutputStream("filename.ext",true))

//kryo.writeObject(output, feed) (tested both line)
kryo.writeClassAndObject(output, myScalaObject)

这是用我的对象(myScalaObject)序列化创建文件的伪代码,这是一个复杂的对象。

该文件似乎写得很好,但我在 Spark RDD 中读取它时遇到问题

Spark 中的伪代码:

val conf = new SparkConf()
    .setMaster("local")
    .setAppName("My application")
    .set("spark.executor.memory", "1g")


conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "myScalaObject")

val sc = new SparkContext(conf)

val file=sc.objectFile[myScalaObject]("filename.ext")

val counts = file.count()

当我尝试执行它时,我收到此错误:

org.apache.spark.SparkException: 
Job aborted: Task 0.0:0 failed 1 times (most recent failure: 
Exception failure: java.io.IOException: file: filename.ext not a SequenceFile)

是否可以在 Spark 中读取这种类型的文件?

如果此解决方案不可行,创建复杂文件结构以在 Spark 中读取的好的解决方案是什么?

谢谢

【问题讨论】:

  • objectFile 用于加载保存为包含序列化对象的 SequenceFile 的RDD。为什么不直接使用 Kryo 读取对象并使用parallel 生成RDD
  • @zsxwing 谢谢,好主意,我试试。但是我有很多小(5-20​​mb)并且不想并行化文件的内容。有没有办法并行化文件名,然后每个服务器读取它的文件?
  • 用文件名创建一个RDD并用map读取内容?
  • @zsxwing 我已经创建了一个带有文件名的 RDD 并将其并行化,工作正常。谢谢

标签: apache-spark kryo


【解决方案1】:

如果你想用objectFile读取数据,用saveAsObjectFile写出数据。

val myObjects: Seq[MyObject] = ...
val rddToSave = sc.parallelize(myObjects) // Or better yet: construct as RDD from the start.
rddToSave.saveAsObjectFile("/tmp/x")
val rddLoaded = sc.objectFile[MyObject]("/tmp/x")

或者,正如zsxwing 所说,您可以创建文件名的RDD,并使用map 读取每个文件的内容。如果希望将每个文件读入单独的分区,请将文件名并行化到单独的分区中:

def loadFiles(filenames: Seq[String]): RDD[Object] = {
  def load(filename: String): Object = {
    val input = new Input(new FileInputStream(filename))
    return kryo.readClassAndObject(input)
  }
  val partitions = filenames.length
  return sc.parallelize(filenames, partitions).map(load)
}

【讨论】:

  • 您现在也可以使用sc.wholeTextFiles。我必须在某个时候更新答案。
猜你喜欢
  • 2021-07-29
  • 2017-08-05
  • 1970-01-01
  • 1970-01-01
  • 2017-08-21
  • 2019-11-12
  • 2020-10-28
  • 2021-06-18
  • 1970-01-01
相关资源
最近更新 更多