【发布时间】:2022-08-07 23:12:44
【问题描述】:
我正在研究火花结构流式传输,其中工作消耗 Kafka 消息,每 10 秒进行一次聚合并将数据保存在 apache hudi 表中。下面的代码工作正常,但它会覆盖每批生成的 apache hudi 表数据。我还不明白为什么会这样?是火花结构流式传输还是 hudi 行为?我正在使用MERGE_ON_READ,因此不应在每次更新时删除表文件。但不知道为什么会这样?由于这个问题,我读到这张表的其他工作失败了。
spark.readStream
.format(\'kafka\')
.option(\"kafka.bootstrap.servers\",
\"localhost:9092\")
...
...
df1 = df.groupby(\'a\', \'b\', \'c\').agg(sum(\'d\').alias(\'d\'))
df1.writeStream
.format(\'org.apache.hudi\')
.option(\'hoodie.table.name\', \'table1\')
.option(\"hoodie.datasource.write.table.type\", \"MERGE_ON_READ\")
.option(\'hoodie.datasource.write.keygenerator.class\', \'org.apache.hudi.keygen.ComplexKeyGenerator\')
.option(\'hoodie.datasource.write.recordkey.field\', \"a,b,c\")
.option(\'hoodie.datasource.write.partitionpath.field\', \'a\')
.option(\'hoodie.datasource.write.table.name\', \'table1\')
.option(\'hoodie.datasource.write.operation\', \'upsert\')
.option(\'hoodie.datasource.write.precombine.field\', \'c\')
.outputMode(\'complete\')
.option(\'path\', \'/Users/lucy/hudi/table1\')
.option(\"checkpointLocation\",
\"/Users/lucy/checkpoint/table1\")
.trigger(processingTime=\"10 second\")
.start()
.awaitTermination()
标签: pyspark apache-kafka spark-structured-streaming apache-hudi