【问题标题】:Amazon EMR and S3, org.apache.spark.sql.AnalysisException: path s3://..../var/table already existsAmazon EMR 和 S3,org.apache.spark.sql.AnalysisException:路径 s3://..../var/table 已存在
【发布时间】:2016-12-09 11:38:02
【问题描述】:

我正在尝试在 Spark 2.0.0 上查找错误的来源,我有一个将表名作为键和数据框作为值的映射,我循环遍历它并最终使用 spark-avro ( 3.0.0-preview2) 将所有内容写入 S3 目录。它在本地运行完美(当然使用本地路径而不是 s3 路径),但是当我在 Amazon 的 EMR 上运行它时,它运行了一段时间,然后它说文件夹已经存在并终止(这意味着相同的键值不止一次在那个 for 循环中使用,对吧?)。这可能是线程的问题吗?

for ((k, v) <- tableMap) {
  val currTable: DataFrame = tableMap(k)
  val decryptedCurrTable = currTable.withColumn("data", decryptUDF(currTable("data")))
  val decryptedCurrTableData = sparkSession.sqlContext.read.json(decryptedCurrTable.select("data").rdd.map(row => row.toString()))
  decryptedCurrTable.write.avro(s"s3://..../$k/table")
  decryptedCurrTableData.write.avro(s"s3://..../$k/tableData")

【问题讨论】:

  • 你有错误日志吗?
  • 我知道,但我相信我回答了我自己的问题。我在写入之后添加了一个 .mode("append") 并且一切正常,所以它一定是 imo 的并发问题。

标签: apache-spark amazon-s3 spark-dataframe amazon-emr spark-avro


【解决方案1】:

我认为这是一个并发问题,我将代码更改为:

decryptedCurrTable.write.mode("append").avro(s"s3://..../$k/table")
decryptedCurrTableData.write.mode("append").avro(s"s3://..../$k/tableData")  

一切正常。

【讨论】:

  • 嗨,我正在研究一个类似的用例,但根据我在你自己的答案中看到的,你的密钥 $k 不断变化,这意味着你没有写到同一个目的地,因此“ append”或“override”模式不应该影响并发?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-06-29
  • 1970-01-01
相关资源
最近更新 更多