【问题标题】:How to convert Avro Schema object into StructType in spark如何在 Spark 中将 Avro Schema 对象转换为 StructType
【发布时间】:2017-04-08 22:04:48
【问题描述】:

我有一个 Row 类型的 RDD,即 RDD[Row] 和 avro 模式对象。我需要使用此信息创建一个数据框。

我需要将 avro 模式对象转换为 StructType 以创建 DataFrame。

你能帮忙吗?

【问题讨论】:

标签: apache-spark schema rdd avro


【解决方案1】:

com.databricks.spark.avro 有一个类可以帮助你解决这个问题

 StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();

请看这个具体的例子:http://bytepadding.com/big-data/spark/read-write-parquet-files-using-spark/

【讨论】:

    【解决方案2】:

    于 2020 年 5 月 31 日更新

    如果你在 scala 2.12 上使用更新的 spark 版本,请在下面使用。

    sbt:

    scalaVersion := "2.12.11"
    val sparkVersion = "2.4.5"
    libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
    
    import org.apache.spark.sql.avro.SchemaConverters
    import org.apache.spark.sql.types.StructType
    
    val schemaType = SchemaConverters
      .toSqlType(avroSchema)
      .dataType
      .asInstanceOf[StructType]
    

    【讨论】:

      【解决方案3】:

      在 pyspark 2.4.7 中,我的解决方案是使用 avroschema 创建一个空数据帧,然后从该空数据帧中获取 StructType 对象。

      with open('/path/to/some.avsc','r') as avro_file:
          avro_scheme = avro_file.read()
      
      df = spark\
          .read\
          .format("avro")\
          .option("avroSchema", avro_scheme)\
          .load()
      
      struct_type = df.schema
      
      

      【讨论】:

        【解决方案4】:

        Wisnia 的答案有效,但仅供参考,我和我的同事提出的另一个解决方案如下:

        avro_schema = "..."
        
        java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
            spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
        )
        
        java_struct_schema = java_schema_type.dataType()
        struct_json_schema = java_struct_schema.json()
        json_schema_obj = json.loads(struct_json_schema)
        schema = StructType.fromJson(json_schema_obj)
        

        【讨论】:

          【解决方案5】:

          Databrics 支持 spark-avro 包中的 avro 相关实用程序,在 sbt 中使用以下依赖项 "com.databricks" % "spark-avro_2.11" % "3.2.0"

          代码

          *

          val sqlSchema=SchemaConverters.toSqlType(avroSchema)

          *

          在 '3.2.0' 版本之前,'toSqlType' 是私有方法,所以如果您使用的是 3.2 之前的版本,请在您自己的 util 类中复制完整的方法,否则升级到最新版本。

          【讨论】:

          • 为什么在回答此类问题时没有人指出需要导入的包?
          【解决方案6】:

          在 pyspark 中做同样的事情的任何例子?下面的代码对我有用,但应该有其他更简单的方法来做到这一点

          # pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4
          
          import requests
          import os
          import avro.schema
          
          from pyspark.sql.types import StructType
          
          schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
          schema_requests = requests.get(url=schema_registry_url)
          
          spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))
          

          【讨论】:

            猜你喜欢
            • 2019-05-05
            • 2019-12-17
            • 2016-02-27
            • 2020-07-18
            • 2017-06-03
            • 1970-01-01
            • 2020-02-26
            • 2018-10-21
            • 2020-01-15
            相关资源
            最近更新 更多