【问题标题】:Spark Delta Table Add new columns in middle Schema EvolutionSpark Delta Table 在中间 Schema Evolution 中添加新列
【发布时间】:2021-09-11 05:16:10
【问题描述】:

必须将具有新列的文件提取到现有表结构中。

create table sch.test (
name string ,
address string 
) USING DELTA 
--OPTIONS ('mergeSchema' 'true')
PARTITIONED BY (name)
LOCATION  '/mnt/loc/fold'
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true);

读取文件的代码: val df = spark.read .format("com.databricks.spark.csv") .option(“标题”,“真”) .load("/mnt/loc/fold")

display(df)

路径中的文件包含以下数据

name,address
raghu,india
raj,usa

在将其写入表格时,

 import org.apache.spark.sql.functions._
df.withColumn("az_insert_ts", current_timestamp())
.withColumn("exec_run_id",lit("233"))
.withColumn("az_inp_file_name",lit("24234filename"))
     .coalesce(12)
     .write
     .mode("append")
     .option("mergeSchema", "true")
     .format("delta")
     .saveAsTable("sch.test")
display(spark.read.table("sch.test"))

添加新列,

name,address,age
raghu,india,12
raj,usa,13

读取文件,

    val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load("/mnt/loc/fold")

display(df)

使用 insertInto 写入表时,

import org.apache.spark.sql.functions._
df.withColumn("az_insert_ts", current_timestamp())
.withColumn("exec_run_id",lit("233"))
.withColumn("az_inp_file_name",lit("24234filename"))
     .coalesce(12)
     .write
     .mode("append")
     .option("mergeSchema", "true")
     .format("delta")
     .insertInto("sch.test")
display(spark.read.table("sch.test"))

得到以下错误,

【问题讨论】:

  • 据我所知,databricks 表是不可变的。所以真的没有办法做到这一点。您需要删除旧表并使用新架构创建一个新表。

标签: azure apache-spark azure-databricks delta-lake


【解决方案1】:

overwriteSchema 设置为true 将清除旧架构并让您创建一个全新的表。

import org.apache.spark.sql.functions._
df.withColumn(""az_insert_ts"", current_timestamp())
.withColumn(""exec_run_id"",lit(""233""))
.withColumn(""az_inp_file_name"",lit(""24234filename""))
     .coalesce(12)
     .write
     .mode(""append"")
     .option(""overwriteSchema"", ""true"")
     .format(""delta"")
     .insertInto(""sch.test"")
display(spark.read.table(""sch.test""))

【讨论】:

  • 它也会清除数据吗?
猜你喜欢
  • 1970-01-01
  • 2021-07-26
  • 2016-08-27
  • 2022-01-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-11
  • 1970-01-01
相关资源
最近更新 更多