【问题标题】:working with arraytype in spark Dataframe在 spark Dataframe 中使用 arraytype
【发布时间】:2018-11-07 05:12:37
【问题描述】:

我的要求是将 DataFrame 中的所有 Decimal 数据类型转换为 String。 Logic 在简单类型上工作正常,但在 ArrayType 上不工作。 这是逻辑:-

var df = spark.sql("select * from test_1")
for(dt <- df.dtypes) {
  if(dt._2.substring(0,7) == "Decimal"){
    df = df.withColumn(dt._1,df(dt._1).cast("String"))  
  }
}

但是arrayType中的列保持不变,尽管它们是十进制类型。请帮忙,我如何循环嵌套元素并将其转换为字符串。 这是我的数据框的架构:

scala> df.schema res77: org.apache.spark.sql.types.StructType = StructType(StructField(mstr_prov_id,StringType,true), StructField(prov_ctgry_cd,StringType,true), StructField(prov_orgnl_efctv_dt,TimestampType,true), StructField(prov_trmntn_dt,TimestampType,true), StructField(prov_trmntn_rsn_cd,StringType,true), StructField(npi_rqrd_ind,StringType,true), StructField(prov_stts_aray_txt,ArrayType(StructType(StructField(PROV_STTS_KEY,DecimalType(22,0),true), StructField(PROV_STTS_EFCTV_DT,TimestampType,true), StructField(PROV_STTS_CD,StringType,true), StructField(PROV_STTS_TRMNTN_DT,TimestampType,true), StructField(PROV_STTS_TRMNTN_RSN_CD,StringType,true)),true),true))

【问题讨论】:

  • 您必须编写一个 UDF,将数组中的每个小数转换为 string.it,因为列的类型是小数数组
  • 你能分享这个需求的示例代码吗?我正在寻找实现此任务的 UDF。
  • 即使我知道,我仍需要一个 UDF,它会遍历数组元素,并将所有 Decimal 类型转换为 String。但我不知道如何编写该代码。因此,寻找相同的示例代码(UDF)。
  • 您想将 prov_stts_aray_txt 列中的所有数据更改为字符串?是这样吗?
  • @RameshMaharjan ,prov_stts_aray_txt 中只有十进制类型的字段,必须转换为字符串。我尝试了很多,但找不到任何线索。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

您还可以强制转换复杂类型,例如如果您有这样的模式的数据框:

root
 |-- arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- i: decimal(22,0) (nullable = true)
 |    |    |-- j: double (nullable = false)

您可以通过执行以下操作强制转换所有十进制类型的数组元素(字段i n 本示例):

df
  .select($"arr".cast("array<struct<i:string,j:double>>"))
  .printSchema()

root
 |-- arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- i: string (nullable = true)
 |    |    |-- j: double (nullable = true)

编辑: 如果事先不知道schema,可以将原schema中的decimal替换成string即可:

val arraySchema = df.schema.fields(0).dataType.simpleString
val castedSchema = arraySchema.replaceAll("decimal\\(.*\\)","string")

df
  .select($"arr".cast(castedSchema))
  .show()

【讨论】:

  • 这不是我的要求。我必须遍历 arrayType 的元素,并将所有 Decimal 类型转换为 String 类型...
  • @Vinitkumar 以及为什么您需要循环播放?
  • 我必须将所有 Decimal 类型转换为 String。并且在 ArrayType 中有些字段是 Decimal。
【解决方案2】:

如果您使用的是 spark 2.1 及更高版本,那么下面的转换应该适合您

val newSchema = DataType.fromJson(df.schema.json.replaceAll("(decimal\\(\\d+,\\d+\\))", "string")).asInstanceOf[StructType]
df.select(newSchema.map(field => col(field.name).cast(field.dataType)): _*)

应该将所有十进制类型转换为字符串类型。

但是,如果您使用的 spark 版本低于上述版本,并且由于结构列中有 timestamp 数据类型,您将遇到

TimestampType (of class org.apache.spark.sql.types.TimestampType$) scala.MatchError: TimestampType (of class org.apache.spark.sql.types.TimestampType$)

它是 casting structs fails on Timestamp fields 并已解决 cast struct with timestamp field fails

【讨论】:

  • 在同一行中还有一个帮助,我可以遍历平面字段并执行替换等操作。但同样它不是遍历嵌套字段。如何遍历包括嵌套字段在内的字段并根据需要执行表达式...提前致谢..
  • 上述解决方案适用于每个十进制数据类型,即使在嵌套结构和数组类型中也是如此。在我回答之前我自己确认了:)
【解决方案3】:

试试这个(你与 == 的比较可能不是你想要的)

var df = spark.sql("select * from test_1")
for(dt <- df.dtypes) {
  if("Decimal".equalsIgnoreCase(dt._2.substring(0,Math.min(7, dt._2.length)))){
    df = df.withColumn(dt._1,df(dt._1).cast("String"))  
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-07
    • 1970-01-01
    • 1970-01-01
    • 2017-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多