【问题标题】:Change hdfs write path dynamically in spark structured streaming在 Spark 结构化流中动态更改 hdfs 写入路径
【发布时间】:2019-09-02 22:05:10
【问题描述】:

我有一个 spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流似乎不能那样工作。它只会创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么方法可以根据当前日期动态更改路径吗?

下面是我的 writestream 的样子

 val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
 val outPath = "maindir/sb_topic/data/loaddate="

val dswWriteStream =dfresult.writeStream
    .outputMode(outputMode) 
    .format(writeformat) 
    .option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
    .option("checkpointLocation", checkpointdir) 
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime("10 minutes")) 

【问题讨论】:

  • 最好将您的答案发布到下面并自行接受。

标签: apache-spark spark-streaming spark-structured-streaming


【解决方案1】:

解决方案:我通过将当前日期列(例如“loaddate”)添加到父数据帧“dfresult”然后按该列对写入流进行分区来解决此问题。

dswWriteStream.partitionBy('loaddate')

【讨论】:

    猜你喜欢
    • 2014-07-27
    • 2020-03-18
    • 1970-01-01
    • 2018-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-14
    • 1970-01-01
    相关资源
    最近更新 更多