【问题标题】:Spark: Reading files using different delimiter than new lineSpark:使用不同于换行符的分隔符读取文件
【发布时间】:2014-10-05 05:37:55
【问题描述】:

我使用的是 Apache Spark 1.0.1。我有许多用 UTF8 \u0001 分隔的文件,而不是通常的新行 \n。如何在 Spark 中读取此类文件?意思是sc.textfile("hdfs:///myproject/*")的默认分隔符是\n,我想改成\u0001

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您可以使用textinputformat.record.delimiterTextInputFormat设置分隔符,例如,

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    
    val conf = new Configuration(sc.hadoopConfiguration)
    conf.set("textinputformat.record.delimiter", "X")
    val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
    val lines = input.map { case (_, text) => text.toString}
    println(lines.collect)
    

    例如,我的输入是一个包含一行aXbXcXd 的文件。上面的代码会输出

    Array(a, b, c, d)
    

    【讨论】:

    • 当我在 spark-shell 中运行上述代码时,出现以下错误: scala> val job = new Job(sc.hadoopConfiguration) 警告:有 1 个弃用警告;使用 -deprecation 重新运行以获取详细信息 java.lang.IllegalStateException:在 org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) 状态为 DEFINE 而不是 RUNNING 的作业如何修复此“java.lang.IllegalStateException : 工作状态为 DEFINE 而不是 RUNNING”的问题?
    • 能否将完整的堆栈轨道粘贴到某个位置并提供链接?
    【解决方案2】:

    在Spark shell中,我根据Setting textinputformat.record.delimiter in spark提取数据:

    $ spark-shell
    ...
    scala> import org.apache.hadoop.io.LongWritable
    import org.apache.hadoop.io.LongWritable
    
    scala> import org.apache.hadoop.io.Text
    import org.apache.hadoop.io.Text
    
    scala> import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.conf.Configuration
    
    scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    
    scala> val conf = new Configuration
    conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
    
    scala> conf.set("textinputformat.record.delimiter", "\u0001")
    
    scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
    data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:19
    

    sc.newAPIHadoopFile("mydata.txt", ...) 是一个RDD[(LongWritable, Text)],其中元素的第一部分是起始字符索引,第二部分是由"\u0001" 分隔的实际文本。

    【讨论】:

      【解决方案3】:

      在 python 中,这可以通过以下方式实现:

      rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
                  "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
                  conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1])
      

      【讨论】:

        【解决方案4】:

        这里是 Chad@zsxwing 为 Scala 用户提供的答案的现成版本,可以这样使用:

        sc.textFile("some/path.txt", "\u0001")
        

        以下 sn-p 使用 implicit class 创建一个附加的 textFile 方法隐式附加到 SparkContext(为了复制 SparkContext 的默认 textFile 方法):

        package com.whatever
        
        import org.apache.spark.SparkContext
        import org.apache.spark.rdd.RDD
        import org.apache.hadoop.conf.Configuration
        import org.apache.hadoop.io.{LongWritable, Text}
        import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
        
        object Spark {
        
          implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {
        
            def textFile(
                path: String,
                delimiter: String,
                maxRecordLength: String = "1000000"
            ): RDD[String] = {
        
              val conf = new Configuration(sc.hadoopConfiguration)
        
              // This configuration sets the record delimiter:
              conf.set("textinputformat.record.delimiter", delimiter)
              // and this one limits the size of one record:
              conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)
        
              sc.newAPIHadoopFile(
                  path,
                  classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
                  conf
                )
                .map { case (_, text) => text.toString }
            }
          }
        }
        

        可以这样使用:

        import com.whatever.Spark.ContextExtensions
        sc.textFile("some/path.txt", "\u0001")
        

        注意附加设置mapreduce.input.linerecordreader.line.maxlength,它限制了记录的最大大小。当从损坏的文件中读取记录可能太长而无法放入内存(使用记录分隔符时发生这种情况的可能性更大)时,这会派上用场。

        使用此设置,当读取损坏的文件时,将抛出异常(java.io.IOException - 因此可捕获),而不是导致内存不足而停止 SparkContext。

        【讨论】:

          【解决方案5】:

          如果您使用的是 spark-context,下面的代码对我有帮助 sc.hadoopConfiguration.set("textinputformat.record.delimiter","delimeter")

          【讨论】:

          • 如果 Spark >= 2.0,使用spark.sparkContext._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","delimeter")
          猜你喜欢
          • 2019-04-01
          • 2017-01-20
          • 1970-01-01
          • 2020-03-20
          • 1970-01-01
          • 2020-06-30
          • 1970-01-01
          • 1970-01-01
          • 2020-11-21
          相关资源
          最近更新 更多