【问题标题】:Spark avro to parquetSpark avro 到镶木地板
【发布时间】:2016-07-04 20:32:30
【问题描述】:

我有一个需要存储为镶木地板文件的 avro 格式数据流(json 编码)。我只能这样做,

val df = sqc.read.json(jsonRDD).toDF()

并将df写成parquet。

这里的模式是从 json 中推断出来的。但我已经有了 avsc 文件,我不想让 spark 从 json 推断架构。

以上述方式,parquet 文件将架构信息存储为 StructType 而不是 avro.record.type。有没有办法存储 avro 模式信息。

火花 - 1.4.1

【问题讨论】:

    标签: scala apache-spark spark-dataframe avro parquet


    【解决方案1】:

    最终使用了这个问题的答案avro-schema-to-spark-structtype

    def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = {
        val dummyFIle = File.createTempFile("avro_dummy", "avro")
        val datumWriter = new GenericDatumWriter[wuser]()
        datumWriter.setSchema(avroSchema)
        val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle)
        writer.flush()
        writer.close()
        val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath)
        df.schema
    }
    

    【讨论】:

      【解决方案2】:

      您可以通过编程方式指定架构

      // The schema is encoded in a string
      val schemaString = "name age"
      
      // Import Row.
      import org.apache.spark.sql.Row;
      
      // Import Spark SQL data types
      import org.apache.spark.sql.types.{StructType,StructField,StringType};
      
      // Generate the schema based on the string of schema
      val schema =
        StructType(
          schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
      
      // Convert records of the RDD (people) to Rows.
      val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
      
      // Apply the schema to the RDD.
      val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
      

      请看:http://spark.apache.org/docs/latest/sql-programming-guide.html

      spark-avro 然后使用模式类型来指定 avro 类型,如下所示

      • Spark SQL 类型 -> Avro 类型
      • ByteType -> int
      • ShortType -> int
      • DecimalType -> 字符串
      • BinaryType -> 字节
      • TimestampType -> 长
      • StructType -> 记录

      您可以按如下方式编写 Avro 记录:

      import com.databricks.spark.avro._
      
      val sqlContext = new SQLContext(sc)
      
      import sqlContext.implicits._
      
      val df = Seq((2012, 8, "Batman", 9.8),
              (2012, 8, "Hero", 8.7),
              (2012, 7, "Robot", 5.5),
              (2011, 7, "Git", 2.0))
              .toDF("year", "month", "title", "rating")
      
      df.write.partitionBy("year", "month").avro("/tmp/output")
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-06-16
        • 1970-01-01
        • 2019-11-20
        • 2017-03-17
        • 2020-03-11
        • 1970-01-01
        相关资源
        最近更新 更多