【问题标题】:Writing spark DataFrame In Apache Hudi Table在 Apache Hudi 表中编写 spark DataFrame
【发布时间】:2021-03-21 23:58:56
【问题描述】:

我是 apace hudi 的新手,我正在尝试使用 spark shell 在我的 Hudi 表中写入我的数据框。 对于第一次输入,我没有创建任何表并以覆盖模式写入,所以我期待它会创建 hudi 表。我正在写下面的代码。

    spark-shell \
    --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    //Initialize a Spark Session for Hudi

    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    import org.apache.spark.sql.SparkSession

    val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertMetastoreParquet", "false").getOrCreat ()

    //Write to a Hudi Dataset 

    val inputDF = Seq(
    ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
    ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
    ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
    ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
    ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
    ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
    ).toDF("id", "creation_date", "last_update_time")

    val hudiOptions = Map[String,String](
    HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",
    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
    //  Upsert Data
    // Create a new DataFrame from the first row of inputDF with a different creation_date value

    val updateDF = inputDF.limit(1).withColumn("creation_date", lit("2014-01-01"))

    updateDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")

    while writing this write statement i m getting below error message.

    java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

请指导我如何编写此声明。

【问题讨论】:

    标签: apache-spark hive apache-hudi


    【解决方案1】:

    以下是 pyspark 中您的问题的工作示例:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    
    spark = (
        SparkSession.builder.appName("Hudi_Data_Processing_Framework")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.hive.convertMetastoreParquet", "false")
        .config(
            "spark.jars.packages",
            "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
        )
        .getOrCreate()
    )
    
    input_df = spark.createDataFrame(
        [
            ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
            ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
            ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
            ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
            ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
            ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
        ],
        ("id", "creation_date", "last_update_time"),
    )
    
    hudi_options = {
        # ---------------DATA SOURCE WRITE CONFIGS---------------#
        "hoodie.table.name": "hudi_test",
        "hoodie.datasource.write.recordkey.field": "id",
        "hoodie.datasource.write.precombine.field": "last_update_time",
        "hoodie.datasource.write.partitionpath.field": "creation_date",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.upsert.shuffle.parallelism": 1,
        "hoodie.insert.shuffle.parallelism": 1,
        "hoodie.consistency.check.enabled": True,
        "hoodie.index.type": "BLOOM",
        "hoodie.index.bloom.num_entries": 60000,
        "hoodie.index.bloom.fpp": 0.000000001,
        "hoodie.cleaner.commits.retained": 2,
    }
    
    # INSERT
    (
        input_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    # UPDATE
    update_df = input_df.limit(1).withColumn("creation_date", lit("2014-01-01"))
    (
        update_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    # REAL UPDATE
    update_df = input_df.limit(1).withColumn("last_update_time", lit("2016-01-01T13:51:39.340396Z"))
    (
        update_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    output_df = spark.read.format("org.apache.hudi").load(
        "/tmp/hudi_test/*/*"
    )
    
    output_df.show()
    

    输出:

    Filesystem 中的 Hudi 表如下所示:

    注意: 您的更新操作实际上会创建一个新分区并执行插入操作,因为您正在修改分区列 (2015-01-01 -> 2014-01-01)。你可以在输出中看到。

    我提供了一个更新示例,它将 last_update_time 更新为 2016-01-01T13:51:39.340396Z,这实际上更新了分区中的 id 100 2015-01-012015-01-01T13:51:39.340396Z2016-01-01T13:51:39.340396Z

    更多示例可以在Hudi Quick Start Guide找到

    【讨论】:

    • 嗨 @Felix K Jose 我正在使用您的代码,因为它仅适用于 .save 方法我正在更改我的 HDF 位置但出现以下错误。
    • 文件“”,第 5 行,在 文件“/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py”,第 550 行,in保存 self._jwrite.save(path) 文件“/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py”,第 319 行,在 get_return_value py4j 中。 protocol.Py4JJavaError:调用 o330.save 时出错。在 java.lang.ClassLoader.defineClass1(Native Method)File "", line 5,in
    • @Rpp 您使用的是什么版本的 spark。这似乎是一个依赖/版本问题。我正在使用:Spark 版本:3.0.2,包类型:为 apache hadoop 3.2 及更高版本预构建 -------------> (spark-3.0.2-bin-hadoop3.2)检查您的 SPARK_HOME 并查看您使用的是 spark 3 而不是 spark 2。
    猜你喜欢
    • 2022-08-07
    • 2019-03-17
    • 1970-01-01
    • 2019-12-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多