【问题标题】:spark.RDD take(n) returns array of element n, n timesspark.RDD take(n) 返回元素 n 的数组,n 次
【发布时间】:2014-10-08 06:40:31
【问题描述】:

我正在使用来自 https://github.com/alexholmes/json-mapreduce 的代码将多行 json 文件读入 RDD。

var data = sc.newAPIHadoopFile( 
    filepath, 
    classOf[MultiLineJsonInputFormat], 
    classOf[LongWritable], 
    classOf[Text], 
    conf)

我打印了前 n 个元素以检查它是否正常工作。

data.take(n).foreach { p => 
  val (line, json) = p
  println
  println(new JSONObject(json.toString).toString(4))
}

但是,当我尝试查看数据时,take 返回的数组似乎不正确。

而不是返回表单的数组

[ data[0], data[1], ... data[n] ] 

它的形式是

[ data[n], data[n], ... data[n] ]

这是我创建的 RDD 的问题,还是我尝试打印它的方式的问题?

【问题讨论】:

    标签: json scala hadoop apache-spark


    【解决方案1】:

    我知道为什么 take 它返回一个包含重复值的数组。

    API 提到:

    Note: Because Hadoop's RecordReader class re-uses the same Writable object 
    for each record, directly caching the returned RDD will create many 
    references to the same object. If you plan to directly cache Hadoop 
    writable objects, you should first copy them using a map function.
    

    因此,在我的例子中,它重用了相同的 LongWritable 和 Text 对象。例如,如果我这样做了:

    val foo = data.take(5)
    foo.map( r => System.identityHashCode(r._1) )
    

    输出是:

    Array[Int] = Array(1805824193, 1805824193, 1805824193, 1805824193, 1805824193)
    

    所以为了防止它这样做,我只是将重用的对象映射到它们各自的值:

    val data = sc.newAPIHadoopFile(
        filepath,
        classOf[MultiLineJsonInputFormat],
        classOf[LongWritable],
        classOf[Text],
        conf ).map(p => (p._1.get, p._2.toString))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多