【问题标题】:Spark Avro to Parquet WriterSpark Avro 到 Parquet Writer
【发布时间】:2016-11-22 14:22:25
【问题描述】:

问题:对象不可序列化

请您看看如何解决这个问题。能够像正确打印一样正确阅读它。但是在将记录写入镶木地板时得到

对象不可序列化

引起:java.io.NotSerializableException: parquet.avro.AvroParquetWriter 序列化堆栈:- 对象不是 可序列化(类:parquet.avro.AvroParquetWriter,值: parquet.avro.AvroParquetWriter@658e7ead)

请查看并告诉我什么是最好的方法。

代码:将 Avro 记录转换为 Parquet

  val records = sc.newAPIHadoopRDD(conf.getConfiguration,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]], //Transforms the PairRDD to RDD 
  classOf[NullWritable]).map(x => x._1.datum) 

  // Build a schema
  val schema = SchemaBuilder
  .record("x").namespace("x")
  .fields
  .name("x").`type`().stringType().noDefault()
  .endRecord

val parquetWriter = new AvroParquetWriter[GenericRecord](new Path(outPath), schema)

val parquet  = new GenericRecordBuilder(schema)

records.foreach { keyVal =>
  val x = keyVal._1.datum().get("xyz") -- Field
     parquet.set("x", x)
        .build
      parquetWriter.write(schema.build())
    }

【问题讨论】:

    标签: hadoop apache-spark hdfs avro parquet


    【解决方案1】:

    您可以从此处开始将 avro 读入数据帧 https://github.com/databricks/spark-avro

    // import needed for the .avro method to be added
    import com.databricks.spark.avro._
    
    val sqlContext = new SQLContext(sc)
    
    // The Avro records get converted to Spark typesca
    val df = sqlContext.read.avro("src/test/resources/episodes.avro")
    
    df.registerTempTable("tempTable")
    val sat = sqlContext.sql( //use lateral view explode )
    sat.write.parquet("/tmp/output")
    

    【讨论】:

    • //use lateral view explode 的目的是什么?为什么需要这个?
    • 您有一个包含三项内容的数组。通过使用横向视图爆炸,您可以展平该行,但它将是三行。除了作为数组的列之外,所有列都是相同的。这将具有三个不同的值。
    • 为什么不单独explode(没有侧视图)?我知道我可以单独使用explode,并且一直想知道为什么要使用横向视图。你也可以使用Dataset.flatMap
    【解决方案2】:

    我不确定您为什么要采用这种方法。但我会推荐一种不同的方法。如果您将 avro 文件放入 rdd,看起来就像您所做的那样。您可以创建一个模式,然后将 RDD 转换为数据帧,然后将数据帧写成 parquet。

    var avroDF = sqlContext.createDataFrame(avroRDD,avroSchema)
    avroDF
        .write
        .mode(SaveMode.Overwrite)
        .parquet("parquet directory to write file")
    

    【讨论】:

    • 感谢您的方法。但问题是这是数组、列表、映射的嵌套结构。非常大的嵌套 avro。因此,为了平整需要遍历所有元素并获得所需的任何内容。
    • 如果您赞成并接受其中一个答案,那就太好了。我回答了你问的每一个问题。 @Ankur
    • 我在 RDD 上使用 foreach 在 AvroParquetWriter 上遇到了同样的 NotSerializableException,因为我试图在写出的 Parquet 中使用一致的 Avro 模式。我还没有找到一种方法来使用 Spark 读取具有已知架构的 Avro 文件目录,然后使用相同的架构创建 Avro 数据的 Parquet 版本。我在 AvroParquetWriter 上尝试过 @transient lazy,但它不起作用。
    【解决方案3】:

    对于我的一些具有复杂结构和数组的复杂 Json,我使用 hive ql 横向视图爆炸。这是一个扁平化的复杂 json 示例。它一开始是 10 行,对于某些跟踪我可以得到 60 行,而有些我得到少于 5 行。这取决于它是如何爆炸的。

    val tenj = sqlContext.read.json("file:///home/marksmith/hive/Tenfile.json")
    
    scala> tenj.printSchema
    root
    
     |-- DDIVersion: string (nullable = true)
     |-- EndTimestamp: string (nullable = true)
     |-- Stalls: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- Stall: long (nullable = true)
     |    |    |-- StallType: string (nullable = true)
     |    |    |-- TraceTypes: struct (nullable = true)
     |    |    |    |-- ActiveTicket: struct (nullable = true)
     |    |    |    |    |-- Category: string (nullable = true)
     |    |    |    |    |-- Traces: array (nullable = true)
     |    |    |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |    |    |-- EndTime: string (nullable = true)
     |    |    |    |    |    |    |-- ID: string (nullable = true)
     |    |    |    |    |    |    |-- Source: string (nullable = true)
     |    |    |    |    |    |    |-- StartPayload: struct (nullable = true)
     |    |    |    |    |    |    |    |-- SubticketID: string (nullable = true)
     |    |    |    |    |    |    |    |-- TicketID: string (nullable = true)
     |    |    |    |    |    |    |    |-- TicketState: long (nullable = true)
     |    |    |    |    |    |    |-- StartTime: string (nullable = true)
    
    tenj.registerTempTable("ddis")
    
    
    val sat = sqlContext.sql(
        "select DDIVersion, StallsExp.stall, StallsExp.StallType, at.EndTime, at.ID, 
           at.Source, at.StartPayload.SubTicketId, at.StartPayload.TicketID, 
           at.StartPayload.TicketState, at.StartTime  
        from ddis 
          lateral view explode(Stalls) st as StallsExp 
          lateral view explode(StallsExp.TraceTypes.ActiveTicket.Traces) at1 as at")
    sat: org.apache.spark.sql.DataFrame = [DDIVersion: string, stall: bigint, StallType: string, EndTime: string, ID: string, Source: string, SubTicketId: string, TicketID: string, TicketState: bigint, StartTime: string]
    
    sat.count
    res22: Long = 10
    
    sat.show
    +----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+
    |DDIVersion|stall|StallType|             EndTime| ID|Source|SubTicketId|TicketID|TicketState|           StartTime|
    +----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+
    |  5.3.1.11|   15|    POPS4|2016-06-08T20:07:...|   | STALL|          0|     777|          1|2016-06-08T20:07:...|
    |  5.3.1.11|   14|    POPS4|2016-06-08T20:07:...|   | STALL|          0|     384|          1|2016-06-08T20:06:...|
    |  5.3.1.11|   13|    POPS4|2016-06-08T20:07:...|   | STALL|          0|  135792|          1|2016-06-08T20:06:...|
    |  5.0.0.28|   26|    POPS4|2016-06-08T20:06:...|   | STALL|          0|     774|          2|2016-06-08T20:03:...|
    

    【讨论】:

    • 谢谢马克。您能否提供读取嵌套 avro 并获取一些特定列并将其转储为 Parquet 格式的方法...
    猜你喜欢
    • 2019-09-25
    • 2015-08-30
    • 2015-05-11
    • 2017-11-08
    • 2022-10-14
    • 1970-01-01
    • 2016-09-02
    • 1970-01-01
    • 2019-01-24
    相关资源
    最近更新 更多