【问题标题】:Error with spark-csv on Amazon EMR ClusterAmazon EMR 集群上的 spark-csv 错误
【发布时间】:2017-01-24 19:41:49
【问题描述】:

我正在尝试通过简单的 Spark 步骤执行来运行 EMR 集群,但遇到了无法解决的错误。该程序在我在 Eclipse 中本地运行时有效,但在 EMR 集群上运行时无效。该程序只是尝试将 S3 上的 CSV 文件转换为 Parquet 格式。

当我在 EMR 中运行时,我收到以下错误:

原因:com.univocity.parsers.common.TextParsingException:已解析输入的长度 (1000001) 超过了解析器设置中定义的最大字符数 (1000000)。 在解析的内容中识别出行分隔符。这可能是错误的原因。解析器设置中的行分隔符设置为“\n”。解析内容:

我没有任何超过 1000000 限制的字段。我已尝试从 s3、s3n 和 s3a 位置读取数据。

    import org.apache.spark.SparkSession
    import org.apache.spark.sql.types._

    object TestEMR {
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
        val schema = StructType(
            Array(
              StructField("field1", StringType ,nullable = true),
              StructField("field2", IntegerType ,nullable = true),
              StructField("field3", IntegerType ,nullable = true),
              StructField("field4", TimestampType ,nullable = true),
              StructField("field5", TimestampType ,nullable = true),
              StructField("field6", StringType ,nullable = true),
              StructField("field7", StringType ,nullable = true),
              StructField("field8", StringType ,nullable = true),
              StructField("field9", StringType ,nullable = true),
              StructField("field10", StringType ,nullable = true),
              StructField("field11", StringType ,nullable = true),
              StructField("field12", StringType ,nullable = true),
              StructField("field13", StringType ,nullable = true),
              StructField("field14", StringType ,nullable = true),
              StructField("field15", StringType ,nullable = true),
              StructField("field16", StringType ,nullable = true),
              StructField("field17", StringType ,nullable = true),
              StructField("field18", StringType ,nullable = true),
              StructField("field19", StringType ,nullable = true),
              StructField("field20", StringType ,nullable = true)
            )
          )

        val df = spark.read
            .format("com.databricks.spark.csv")
            .schema(schema)
            .option("nullValue","")
            .option("treatEmptyValuesAsNulls","true")
            .load("s3://mybucket/input/myfile.csv")
       df.write.mode("append").parquet("s3://mybucket/output/myfile")
       spark.stop
      }
    }

【问题讨论】:

    标签: apache-spark amazon-emr spark-csv


    【解决方案1】:

    听起来它没有找到行尾,因此不断读取,直到达到单行 10K 字符的限制。

    正如他们所说:检查该文件的换行符

    【讨论】:

    • 文件没问题。加载操作只是没有在换行符上拆分文件。我能够将代码转换为 sc.texfile(myfile) 并且它可以很好地读取文件。
    • 有趣。 FWIW Spark 2 有一个内置的 CSV 解析器,这意味着您可以针对 spark 团队提交 JIRA。我运行的 s3a 集成测试确实使用 s3 上的 .csv.gz 文件和该模块
    【解决方案2】:

    这个问题在spark-csv jira 中仍然存在。如果您没有数据问题或读取为 RDD,然后创建数据帧,他们提供了解决方法,例如使用开放的 csv 解析器。

    val rdd = sc.textFile("file.csv")
    // Here, filtering or transformation
    //val filteredRDD = rdd.filter..
    //val transformedRDD = rdd.map..
    
    val df = new CsvParser().csvRdd(sqlContext, transformedRDD)
    

    【讨论】:

      猜你喜欢
      • 2017-07-17
      • 2016-04-19
      • 1970-01-01
      • 1970-01-01
      • 2013-06-14
      • 1970-01-01
      • 2018-09-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多