【问题标题】:SparkException: Task failed while writing rowsSparkException:写入行时任务失败
【发布时间】:2018-11-30 20:00:35
【问题描述】:

使用spark-streaming消费来自Kafka的数据,然后以orc格式写入HDFS

Kafka中存储的数据是这样的:

hadoop
hive
impala
hive

我的代码:

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder.master("local[4]")
      .appName("SpeedTester")
      .config("spark.driver.memory", "3g")
      .getOrCreate()

    val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.95.20:9092")
      .option("subscribe", "trial")
      .option("startingOffsets" , "earliest")
      .load()
      .selectExpr("CAST(value as string)")
      .writeStream
      .outputMode("append")
      .format("orc")
      .option("path", "hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
      .awaitTermination()
  }

代码可以成功将text格式的数据写入HDFS。但是,当我将其更改为orc 格式时,它会返回:

Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo/part-00000-cfd9991f-e503-4140-811b-a00f7da7191e-c000.snappy.orc
        at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
        at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)

这个问题的原因是什么以及如何解决? 任何帮助表示赞赏。


顺便说一下Hive表创建句:

create table test.demo (demo string)
stored as orc;

【问题讨论】:

  • 必须尝试在 HDFS 中查找文件夹不为空并且运行 Spark 进程的用户有权从该文件夹中读取?
  • @AlonsoDominguez 感谢您的回复。所有相关文件夹均已删除,我使用 sudo -u hdfs 提交 spark 作业,因此它应该具有权限。

标签: apache-spark hadoop spark-streaming spark-structured-streaming


【解决方案1】:

您需要创建一个新的 hive 会话,然后使用它以 ORC 格式存储数据。代码看起来像(未经测试,因为我无权访问任何火花集群):

def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder.master("local[4]")
  .appName("SpeedTester")
  .config("spark.driver.memory", "3g")
  .getOrCreate()

// create a new hive context from the spark context
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark)


// create the data frame and write it to orc
// output will be a directory of orc files
val ds = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "192.168.95.20:9092")
  .option("subscribe", "trial")
  .option("startingOffsets" , "earliest")
  .load()

ds.write.mode(SaveMode.Overwrite)
  .format("orc")
  .save("hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo/")
}

试试这个,让我知道它是否有效!

【讨论】:

  • 非常感谢您的回复~。生成hive_context 时出现错误。但是好像hiveContext没有用。
  • 这个解决方案有效吗?如果是,请支持答案! ;-)
猜你喜欢
  • 2016-07-02
  • 2020-04-06
  • 1970-01-01
  • 2020-03-07
  • 2018-04-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-05-29
相关资源
最近更新 更多