【问题标题】:Writing files to local system with Spark in Cluster mode在集群模式下使用 Spark 将文件写入本地系统
【发布时间】:2016-11-24 12:10:36
【问题描述】:

我知道这是使用 Spark 的一种奇怪方式,但我正在尝试使用 Spark 将数据帧保存到本地文件系统(不是 hdfs),即使我在 cluster mode 中。我知道我可以使用client mode,但我确实想在cluster mode 中运行,并且不关心应用程序将作为驱动程序在哪个节点(共3 个)上运行。 下面的代码是我正在尝试做的伪代码。

// create dataframe
val df = Seq(Foo("John", "Doe"), Foo("Jane", "Doe")).toDF()
// save it to the local file system using 'file://' because it defaults to hdfs://
df.coalesce(1).rdd.saveAsTextFile(s"file://path/to/file")

这就是我提交 spark 应用程序的方式。

spark-submit --class sample.HBaseSparkRSample --master yarn-cluster hbase-spark-r-sample-assembly-1.0.jar

如果我在 local mode 中但不在 yarn-cluster mode 中,这可以正常工作。

例如,java.io.IOException: Mkdirs failed to create file 出现在上面的代码中。

我已将 df.coalesce(1) 部分更改为 df.collect 并尝试使用纯 Scala 保存文件,但最终得到 Permission denied

我也试过了:

  • spark-submitroot 用户
  • chowned yarn:yarn, yarn:hadoop, spark:spark
  • 给相关目录chmod 777

但没有运气。

我假设这与 clustersdrivers and executorsuser 有关,他们正试图写入本地文件系统,但我几乎无法自己解决这个问题。

我正在使用:

  • 火花:1.6.0-cdh5.8.2
  • 斯卡拉:2.10.5
  • Hadoop:2.6.0-cdh5.8.2

欢迎任何支持,并提前致谢。

我尝试过的一些文章:

  • “Spark saveAsTextFile() 导致 Mkdirs 无法为一半目录创建”-> 尝试更改用户但没有任何改变
  • “未能将 RDD 作为文本文件保存到本地文件系统”-> chmod 没有帮助我

已编辑 (2016/11/25)

这是我得到的例外。

java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/11/24 20:24:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: scala hadoop apache-spark


    【解决方案1】:

    我将回答我自己的问题,因为最终,似乎没有一个答案不能解决我的问题。尽管如此,感谢所有答案并指出我可以检查的替代方案。

    我认为@Ricardo 最接近提到了 Spark 应用程序的用户。我检查了whoamiProcess("whoami"),用户是yarn。问题可能是我试图输出到/home/foo/work/rhbase/r/input/input.csv,尽管/home/foo/work/rhbaseyarn:yarn 所有,/home/foofoo:foo 所有。我没有详细检查,但这可能是导致permission 问题的原因。

    当我在我的 Spark 应用程序中使用 Process("pwd") 命中 pwd 时,它会输出 /yarn/path/to/somewhere。所以我决定将我的文件输出到/yarn/input.csv,尽管在cluster mode 中它还是成功的。

    我可能可以得出结论,这只是一个简单的权限问题。欢迎任何进一步的解决方案,但现在,这就是我解决这个问题的方式。

    【讨论】:

    • 嗨,我有一个类似的问题。我在集群模式下运行 spark 代码并使用 pandas 编写一些文件,我认为这些文件是写在 yarn temp 目录中但我想将它们写在 HDFS 中,有没有直接的方法可以做到这一点?
    【解决方案2】:

    如果您以yarn-cluster mode 运行作业,驱动程序将在任何由 YARN 管理的机器上运行,因此如果saveAsTextFile 具有本地文件路径,那么它将将输出存储在任何机器上驱动程序在哪里运行。

    尝试以yarn-client mode 运行作业,以便驱动程序在客户端计算机中运行

    【讨论】:

    • 感谢您的建议。我知道这很奇怪,但我确实想在yarn-cluster mode 中运行。我有 3 个节点,我不在乎驱动程序将在哪个节点。
    • 但是无论哪个节点驱动程序正在运行,都只会创建文件:)
    • 为什么不能将数据存储在 hdfs 路径中?您可以以纱线集群模式运行
    • 问题是saveAsTextFile抛出permission denied异常,驱动节点无法写入本地文件系统。我有一个只能从本地文件系统读取的shell script。这就是我不能使用 hdfs 的原因......另外,我不能编辑脚本。我知道这是使用 Spark 的一种奇怪方式。
    • 能否发送完整的登录权限被拒绝错误
    【解决方案3】:

    检查您是否尝试使用 Spark 服务以外的用户运行/写入文件。 在这种情况下,您可以通过预设目录 ACL 来解决权限问题。示例:

    setfacl -d -m group:spark:rwx /path/to/
    

    (将“spark”修改为您尝试写入文件的用户组)

    【讨论】:

      【解决方案4】:

      使用 forEachPartition 方法,然后为每个分区获取文件系统对象并一一写入记录,下面是我正在写入 hdfs 的示例代码,您也可以使用本地文件系统

      Dataset<String> ds=....
      
      ds.toJavaRdd.foreachPartition(new VoidFunction<Iterator<String>>() {
          @Override
          public void call(Iterator<String> iterator) throws Exception {
      
          final FileSystem hdfsFileSystem = FileSystem.get(URI.create(finalOutPathLocation), hadoopConf);
      
          final FSDataOutputStream fsDataOutPutStream = hdfsFileSystem.exists(finalOutPath)
                  ? hdfsFileSystem.append(finalOutPath) : hdfsFileSystem.create(finalOutPath);
      
      
          long processedRecCtr = 0;
          long failedRecsCtr = 0;
      
      
          while (iterator.hasNext()) {
      
              try {
                  fsDataOutPutStream.writeUTF(iterator.next);
              } catch (Exception e) {
                  failedRecsCtr++;
              }
              if (processedRecCtr % 3000 == 0) {
                  LOGGER.info("Flushing Records");
                  fsDataOutPutStream.flush();
              }
          }
      
          fsDataOutPutStream.close();
              }
      });
      

      【讨论】:

        【解决方案5】:

        请参考spark文档了解--master选项在spark-submit中的使用。

        • --master local应该在本地运行时使用。

        • --master yarn --deploy-mode cluster 应该在实际运行在纱线集群上时使用。

        请参阅 thisthis

        【讨论】:

        • 感谢您的快速回复。这是否意味着我错过了使用参数?我的其他 spark 应用程序在使用 yarn-cluster 参数的集群模式下似乎确实可以正常工作。我要编辑我的问题,但我确实想在集群模式下运行,但想保存到本地文件系统,即使我不知道它将是哪个节点。
        猜你喜欢
        • 2019-11-25
        • 2020-10-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-01-25
        • 2021-12-02
        • 2017-04-19
        • 1970-01-01
        相关资源
        最近更新 更多