【问题标题】:Converting StructType to Avro Schema, returns type as Union when using databricks spark-avro将 StructType 转换为 Avro Schema,使用 databricks spark-avro 时返回类型为 Union
【发布时间】:2019-05-05 20:47:14
【问题描述】:

我正在使用 databricks spark-avro 将数据帧架构转换为 avro 架构。返回的 avro 架构没有默认值。当我尝试从架构中创建通用记录时,这会导致问题。谁能帮忙看看这个功能的正确使用方法?

Dataset<Row> sellableDs = sparkSession.sql("sql query");
SchemaBuilder.RecordBuilder<Schema> rb = SchemaBuilder.record("testrecord").namespace("test_namespace");
Schema sc = SchemaConverters.convertStructToAvro(sellableDs.schema(), rb, "test_namespace");
System.out.println(sc.toString());
System.out.println(sc.getFields().get(0).toString());
String schemaString = sc.toString();
sellableDs.foreach(
    (ForeachFunction<Row>) row -> {
        Schema scEx = new Schema.Parser().parse(schemaString);
        GenericRecord gr;
        gr = new GenericData.Record(scEx);
        System.out.println("Generic record Created");
        int fieldSize = scEx.getFields().size();
        for (int i = 0; i < fieldSize; i++ ) {
            // System.out.println( row.get(i).toString());
            System.out.println("field: " + scEx.getFields().get(i).toString() + "::" + "value:" + row.get(i));
            gr.put(scEx.getFields().get(i).toString(), row.get(i));
            //i++;
        }
    }
);

这是 df 架构:

StructType(StructField(key,IntegerType,true), StructField(value,DoubleType,true))

这是 avro 转换后的架构:

{"type":"record","name":"testrecord","namespace":"test_namespace","fields":[{"name":"key","type":["int","null"]},{"name":"value","type":["double","null"]}]}

【问题讨论】:

    标签: apache-spark-sql schema avro databricks spark-avro


    【解决方案1】:

    问题在于SchemaConverters 类不包含默认值作为架构创建的一部分。您有 2 个选项,在创建记录之前修改架构添加默认值或在使用某些值构建之前填充记录(它实际上可能是您的行中的值)。例如空。这是一个如何使用您的架构创建记录的示例

    import org.apache.avro.generic.GenericRecordBuilder 
    import org.apache.avro.Schema
    var schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"testrecord\",\"namespace\":\"test_namespace\",\"fields\":[{\"name\":\"key\",\"type\":[\"int\",\"null\"]},{\"name\":\"value\",\"type\":[\"double\",\"null\"]}]}")
    
    
    var  builder = new GenericRecordBuilder(schema);
    
    for (i <- 0 to schema.getFields().size() - 1 ) {
      builder.set(schema.getFields().get(i).name(), null)
    }
    
    var record = builder.build();
    print(record.toString())
    

    【讨论】:

    • 感谢您的意见。我通过将模式字段转换为字符串而不是推断名称而犯了错误。感谢您的帮助
    猜你喜欢
    • 2017-04-08
    • 1970-01-01
    • 2019-12-17
    • 2016-02-27
    • 2016-08-19
    • 1970-01-01
    • 2016-11-03
    • 1970-01-01
    • 2015-10-31
    相关资源
    最近更新 更多