【问题标题】:Spark write operation HDFS using temporal path使用时间路径的 Spark 写入操作 HDFS
【发布时间】:2020-11-23 16:15:59
【问题描述】:

我正在尝试从此 Scala 代码写入 csv 文件。我使用 HDFS 作为临时目录,然后只需 writer.write 在现有子文件夹中创建一个新文件。我收到以下错误消息:

val inputFile = "s3a:/tfsdl-ghd-wb/raidnd/rawdata.csv" //  INPUT path 
val outputFile = "s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv" //  OUTPUT path 
val dateFormat = new SimpleDateFormat("yyyyMMdd")
val fileSystem = getFileSystem(inputFile)
val inputData = readCSVFile(fileSystem, inputFile, skipHeader = true).toSeq

val writer = new PrintWriter(new File(outputFile))
writer.write("Sales,cust,Number,Date,Credit,SKU\n")
filtinp.foreach(x => {
  val (com1, avg1) = com1Average(filtermp, x)
  val (com2, avg2) = com2Average(filtermp, x)
  writer.write(s"${x.Date},${x.cust},${x.Number},${x.Credit}\n")
})
writer.close()

def getFileSystem(path: String): FileSystem = {
val hconf = new Configuration() // initialize new hadoop configuration
new Path(path).getFileSystem(hconf) // get new filesystem to handle data

java.io.FileNotFoundException: s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv(没有这样的文件或目录)

如果我选择新文件或退出文件也会发生同样的情况,我检查了路径是否正确,只是想在其中创建一个新文件。

问题是为了使用基于文件系统的源写入数据,您需要一个临时目录,这是 Spark 使用的提交机制的一部分,即数据首先写入临时目录,一旦任务完成完成后,自动将处理后的文件移动到最终路径。

我是否应该将每个 Spark 应用程序的临时文件夹的路径更改为 S3?我认为最好在本地处理(本地文件 HDFS)然后将处理后的输出文件上传到 S3

我也只是看到我正在使用的数据块集群中没有“没有 Spark 配置集”,这会干扰问题吗?

【问题讨论】:

  • 您能否粘贴您的完整代码,因为此代码看起来不完整,无法理解您要执行的操作?
  • 只计算按客户和 SKU 分组的平均值

标签: scala apache-spark hdfs


【解决方案1】:

如果您能够使用 spark/scala 以 DataFrame 的形式读取原始数据,那么您可以对您的数据帧执行转换以构建最终的数据帧。获得最终数据帧后,需要将其写入为 csv 文件,您只需使用以下单行代码将 csv 文件保存到 s3 存储桶路径或 hdfs 路径。

df.write.format('csv').option('header','true').mode('overwrite').option('sep',',').save('s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv')

【讨论】:

    猜你喜欢
    • 2014-07-27
    • 1970-01-01
    • 2019-09-02
    • 2017-03-17
    • 1970-01-01
    • 2020-03-18
    • 2020-09-15
    • 2018-06-02
    • 1970-01-01
    相关资源
    最近更新 更多