【发布时间】:2019-09-02 22:05:10
【问题描述】:
我有一个 spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流似乎不能那样工作。它只会创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么方法可以根据当前日期动态更改路径吗?
下面是我的 writestream 的样子
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))
【问题讨论】:
-
最好将您的答案发布到下面并自行接受。
标签: apache-spark spark-streaming spark-structured-streaming