【问题标题】:Pyspark delta lake json datatype evolution issue ( merge incompatible exception )Pyspark delta Lake json 数据类型演变问题(合并不兼容异常)
【发布时间】:2021-07-10 16:15:21
【问题描述】:

我正在研究 pyspark (3.x) 和 delta Lake。我在数据类型方面面临一些挑战。 我们以 JSON 数据类型接收数据,我们正在对 JSON 数据集进行一些扁平化并将其保存为增量表,其中选项为“mergeSchema”,如下所示。我们不会在表格上强加任何架构。

df.write\
    .format("delta")\
    .partitionBy("country","city")\
    .option("mergeSchema","true")\
    .mode("append")\
    .save(delta_path)\

我们面临的问题是 - JSON 字段的数据类型经常变化,例如在增量表中,“field_1”的数据类型为 StringType,但新 JSON 的“field_1”数据类型为 LongType .因此,我们遇到了合并不兼容的异常。

ERROR : Failed to merge fields 'field_1' and 'field_1'. Failed to merge incompatible data types StringType and LongType

如何处理增量表中的这种数据类型演变,我不想在字段级别处理数据类型更改,因为我们有 300 多个字段作为 json 的一部分。

【问题讨论】:

    标签: json pyspark jsonschema delta-lake


    【解决方案1】:

    我也采取了类似于nilesh1212的方式,即手动合并schema。

    在我的例子中,我的脚本可以处理嵌套类型,可以在这里找到: https://github.com/miguellobato84/spark-delta-schema-evolution

    另外,我写了这篇关于这个问题的文章 https://medium.com/@miguellobato84/improving-delta-lake-schema-evolution-2cce8db2f0f5

    【讨论】:

      【解决方案2】:

      为了解决我的问题,我编写了一个新函数,它本质上合并了 delta 表的架构(如果存在 delta 表)和 JSON 架构。

      在高层次上,我创建了一个新架构 - 这个新架构本质上是 delta Lake 表中的公共列和 JSON 字段中的新列的组合,通过创建这个新架构,我通过应用这个新架构重新创建了一个数据框架构。 这解决了我的问题。

      def get_merged_schema(delta_table_schema, json_data_schema):
          
          print('str(len(delta_table_schema.fields)) -> ' + str(len(delta_table_schema.fields)))
          print('str(len(json_data_schema.fields)) -> '+ str(len(json_data_schema.fields)))
          
          no_commom_elements=False
          no_new_elements=False
          import numpy as np
          struct_field_array=[]
          if len(set(delta_table_schema.names).intersection(set(json_data_schema.names))) > 0:
              common_col=set(delta_table_schema.names).intersection(set(json_data_schema.names))
              print('common_col len: -> '+ str(len(common_col)))
              for name in common_col:
                  for f in delta_table_schema.fields:
                    if(f.name == name):
                        struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
          else:
              no_commom_elements=True
              print("no common elements")
      
          if len(np.setdiff1d(json_data_schema.names,delta_table_schema.names)) > 0:
              diff_list = np.setdiff1d(json_data_schema.names,delta_table_schema.names)
              print('diff_list len: -> '+ str(len(diff_list)))
              for name in diff_list:
                  for f in json_data_schema.fields:
                      if(f.name == name):
                          struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
          else:
              no_new_elements=True
              print("no new elements")
            
          print('len(StructType(struct_field_array)) -> '+str(len(StructType(struct_field_array))))
          df=spark.createDataFrame(spark.sparkContext.emptyRDD(),StructType(struct_field_array))
          if no_commom_elements and no_new_elements: 
            return StructType(None) 
          else: 
            return df.select(sorted(df.columns)).schema
      

      【讨论】:

        【解决方案3】:

        根据文章Diving Into Delta Lake: Schema Enforcement & EvolutionmergeSchema=true可以处理以下场景:

        • 添加新列(这是最常见的场景)
        • 从 NullType -> 任何其他类型更改数据类型,或从 ByteType -> ShortType -> IntegerType 向上转换

        这篇文章还暗示了你可以做什么:

        “其他不符合架构演化条件的更改需要通过添加.option("overwriteSchema", "true") 来覆盖架构和数据。例如,在列“Foo”最初是整数数据类型的情况下并且新架构将是字符串数据类型,则需要重写所有 Parquet(数据)文件。这些更改包括:"

        • 删除一列
        • 更改现有列的数据类型(就地)
        • 重命名仅区分大小写的列名(例如“Foo”和“foo”)

        【讨论】:

        • 这并没有完全解决我的问题,我通过合并 delta 表模式和 json 模式创建了一个新模式,通过合并 delta 和 json 模式,我的所有 json 类型的问题都得到了解决。跨度>
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-11-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多