【发布时间】:2014-10-05 05:37:55
【问题描述】:
我使用的是 Apache Spark 1.0.1。我有许多用 UTF8 \u0001 分隔的文件,而不是通常的新行 \n。如何在 Spark 中读取此类文件?意思是sc.textfile("hdfs:///myproject/*")的默认分隔符是\n,我想改成\u0001。
【问题讨论】:
标签: apache-spark
我使用的是 Apache Spark 1.0.1。我有许多用 UTF8 \u0001 分隔的文件,而不是通常的新行 \n。如何在 Spark 中读取此类文件?意思是sc.textfile("hdfs:///myproject/*")的默认分隔符是\n,我想改成\u0001。
【问题讨论】:
标签: apache-spark
您可以使用textinputformat.record.delimiter为TextInputFormat设置分隔符,例如,
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "X")
val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val lines = input.map { case (_, text) => text.toString}
println(lines.collect)
例如,我的输入是一个包含一行aXbXcXd 的文件。上面的代码会输出
Array(a, b, c, d)
【讨论】:
在Spark shell中,我根据Setting textinputformat.record.delimiter in spark提取数据:
$ spark-shell
...
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> val conf = new Configuration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
scala> conf.set("textinputformat.record.delimiter", "\u0001")
scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:19
sc.newAPIHadoopFile("mydata.txt", ...) 是一个RDD[(LongWritable, Text)],其中元素的第一部分是起始字符索引,第二部分是由"\u0001" 分隔的实际文本。
【讨论】:
在 python 中,这可以通过以下方式实现:
rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1])
【讨论】:
这里是 Chad 和 @zsxwing 为 Scala 用户提供的答案的现成版本,可以这样使用:
sc.textFile("some/path.txt", "\u0001")
以下 sn-p 使用 implicit class 创建一个附加的 textFile 方法隐式附加到 SparkContext(为了复制 SparkContext 的默认 textFile 方法):
package com.whatever
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
object Spark {
implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {
def textFile(
path: String,
delimiter: String,
maxRecordLength: String = "1000000"
): RDD[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
// This configuration sets the record delimiter:
conf.set("textinputformat.record.delimiter", delimiter)
// and this one limits the size of one record:
conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)
sc.newAPIHadoopFile(
path,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
conf
)
.map { case (_, text) => text.toString }
}
}
}
可以这样使用:
import com.whatever.Spark.ContextExtensions
sc.textFile("some/path.txt", "\u0001")
注意附加设置mapreduce.input.linerecordreader.line.maxlength,它限制了记录的最大大小。当从损坏的文件中读取记录可能太长而无法放入内存(使用记录分隔符时发生这种情况的可能性更大)时,这会派上用场。
使用此设置,当读取损坏的文件时,将抛出异常(java.io.IOException - 因此可捕获),而不是导致内存不足而停止 SparkContext。
【讨论】:
如果您使用的是 spark-context,下面的代码对我有帮助
sc.hadoopConfiguration.set("textinputformat.record.delimiter","delimeter")
【讨论】:
spark.sparkContext._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","delimeter")