【问题标题】:Unable to write non parititoned table using Apache Hudi无法使用 Apache Hudi 写入非分区表
【发布时间】:2022-03-21 04:32:42
【问题描述】:

我正在使用 Apache Hudi 将非分区表写入 AWS S3 并将其同步到配置单元。这是正在使用的DataSourceWriteOptions

val hudiOptions: Map[String, String] = Map[String, String](
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "PERSON_ID",
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "UPDATED_DATE",
      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "",
      DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,
      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
    )

如果分区表写入成功,但如果我尝试写入非分区表则会出错。这是错误输出sn-p

Caused by: java.lang.NullPointerException
        at org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath(HoodieInputFormatUtils.java:283)
        at org.apache.hudi.hadoop.InputPathHandler.parseInputPaths(InputPathHandler.java:100)
        at org.apache.hudi.hadoop.InputPathHandler.<init>(InputPathHandler.java:60)
        at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:81)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:83)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:82)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:152)
        at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:357)
        at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:280)
        ... 41 more

这是HoodieInputFormatUtils.getTableMetaClientForBasePath()的代码

/**
   * Extract HoodieTableMetaClient from a partition path(not base path).
   * @param fs
   * @param dataPath
   * @return
   * @throws IOException
   */
  public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs, Path dataPath) throws IOException {
    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
    if (HoodiePartitionMetadata.hasPartitionMetadata(fs, dataPath)) {
      HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, dataPath);
      metadata.readFromFS();
      levels = metadata.getPartitionDepth();
    }
    Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
  }

第 283 行是 LOG.info(),这会导致 NullPointerException。所以看起来为分区提供的配置值已经搞砸了。此代码正在 AWS EMR 上运行。

Release label:emr-5.30.1
Hadoop distribution:Amazon 2.8.5
Applications:Hive 2.3.6, Spark 2.4.5

【问题讨论】:

  • 上周我对此表示赞同,因为我遇到了同样的问题。我现在可以使用它,我认为唯一的区别是我没有指定HIVE_PARTITION_FIELDS_OPT_KEYHIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEYHIVE_STYLE_PARTITIONING_OPT_KEY。删除这些可以解决您的问题吗?

标签: apache-spark hadoop hive apache-hudi


【解决方案1】:

我怀疑 PARTITIONPATH_FIELD_OPT_KEY 和 HIVE_PARTITION_FIELDS_OPT_KEY 应该未定义。 为了验证你的配置,我建议去https://doc.hcs.huawei.com/usermanual/mrs/mrs_01_24035.html

hoodie.datasource.write.partitionpath.field 和 hoodie.datasource.hive_sync.partition_fields 应该是空白

hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.NonpartitionedKeyGenerator

hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.NonPartitionedExtractor

我在使用 Hudi 0.9.0 的 pySpark 上遇到 hive 同步问题,上述文档有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-08-06
    • 2022-08-07
    • 2020-04-08
    • 1970-01-01
    • 2021-12-28
    • 2021-10-28
    • 1970-01-01
    • 2016-06-19
    相关资源
    最近更新 更多