【发布时间】:2020-11-23 16:15:59
【问题描述】:
我正在尝试从此 Scala 代码写入 csv 文件。我使用 HDFS 作为临时目录,然后只需 writer.write 在现有子文件夹中创建一个新文件。我收到以下错误消息:
val inputFile = "s3a:/tfsdl-ghd-wb/raidnd/rawdata.csv" // INPUT path
val outputFile = "s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv" // OUTPUT path
val dateFormat = new SimpleDateFormat("yyyyMMdd")
val fileSystem = getFileSystem(inputFile)
val inputData = readCSVFile(fileSystem, inputFile, skipHeader = true).toSeq
val writer = new PrintWriter(new File(outputFile))
writer.write("Sales,cust,Number,Date,Credit,SKU\n")
filtinp.foreach(x => {
val (com1, avg1) = com1Average(filtermp, x)
val (com2, avg2) = com2Average(filtermp, x)
writer.write(s"${x.Date},${x.cust},${x.Number},${x.Credit}\n")
})
writer.close()
def getFileSystem(path: String): FileSystem = {
val hconf = new Configuration() // initialize new hadoop configuration
new Path(path).getFileSystem(hconf) // get new filesystem to handle data
java.io.FileNotFoundException: s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv(没有这样的文件或目录)
如果我选择新文件或退出文件也会发生同样的情况,我检查了路径是否正确,只是想在其中创建一个新文件。
问题是为了使用基于文件系统的源写入数据,您需要一个临时目录,这是 Spark 使用的提交机制的一部分,即数据首先写入临时目录,一旦任务完成完成后,自动将处理后的文件移动到最终路径。
我是否应该将每个 Spark 应用程序的临时文件夹的路径更改为 S3?我认为最好在本地处理(本地文件 HDFS)然后将处理后的输出文件上传到 S3
我也只是看到我正在使用的数据块集群中没有“没有 Spark 配置集”,这会干扰问题吗?
【问题讨论】:
-
您能否粘贴您的完整代码,因为此代码看起来不完整,无法理解您要执行的操作?
-
只计算按客户和 SKU 分组的平均值
标签: scala apache-spark hdfs