【问题标题】:Convert GenericRecord to DF将 GenericRecord 转换为 DF
【发布时间】:2018-11-30 22:46:02
【问题描述】:

我在流式传输中有一个要求,我必须将 GenericRecord 转换为 DatFrame,以便我可以使用 EXPLODE 和 DF 中可用的其他功能。所以首先,我正在研究如何将 GenericRecord 转换为 DF。

我检查了下面的 URL,它有助于将记录转换为 DF。但我无法理解如何将类 SchemaConverterUtils 添加到 avro 对象。

How to convert RDD[GenericRecord] to dataframe in scala?

当我尝试编辑时,它给了我只读文件。我是 scala/java 的新手。你能帮我理解如何做到这一点。

谢谢

【问题讨论】:

标签: scala apache-spark avro spark-structured-streaming


【解决方案1】:

关于该帖子,spark-avro 库已被 DataBricks 弃用并捐赠给 Spark。

ABRiS 库提供了一个 UDF,用于将 Array[Byte] 的列转换为复杂类型的列,并最终转换为 DataFrame

在您的情况下,您应该先进行几次转换。

import org.apache.spark.sql.DataFrame
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.examples.data.generation.AvroDataUtils

val spark: SparkSession = SparkSession
    .builder().master("local[*]").getOrCreate()
// read data into an RDD of GenericRecord called "genericRecordRdd"
// Have your schema in string format in a variable called "stringSchema"
import spark.implicits._
val domainDF: DataFrame = genericRecordRdd
        .map(AvroDataUtils.recordToBytes)
        .toDF("value")
        .select(from_avro(col("value"), stringSchema) as 'data).select("data.*")

AvroDataUtils.recordToBytes 是 ABRiS 库的一部分,它将 GenericRecord 对象转换为 Array[Byte]。然后你创建一个只有一列的DataFrame,它被称为"value"。此时,您已准备好使用from_avro UDF。按照网站上的文档,您确实有其他选择,但从您的描述来看,我认为这是最接近的。

【讨论】:

  • 我们可以使用 abris 库将通用记录转换为 spark 3.0.1 中的 spark 行吗?
猜你喜欢
  • 2022-11-07
  • 1970-01-01
  • 1970-01-01
  • 2018-07-27
  • 1970-01-01
  • 2022-08-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多