【问题标题】:Error: java.lang.IllegalArgumentException: Option 'basePath' must be a directory错误:java.lang.IllegalArgumentException:选项“basePath”必须是目录
【发布时间】:2019-05-23 02:45:04
【问题描述】:

根据https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-structured-streaming.adoc 中提供的书籍,我正在尝试使用 spark-shell 使用 Spark Structured Streaming,但很难使其正常工作。

我的代码:

import org.apache.spark.sql.Encoders
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

sc.setLogLevel("INFO")

case class KafkaMessage(topic: String, id: String, data: String)

val schema = Encoders.product[KafkaMessage].schema

val ds = spark.
         readStream.
         schema(schema).
         format("csv").
         option("header","false").
         option("sep", ";").
         load("file:///tmp/kafka-sample-messages.csv").
         as[KafkaMessage]

val msgs = ds.
           groupBy('id).
           agg(count('id) as "total")

val msgsStream = msgs.
                 writeStream.
                 format("console").
                 outputMode(OutputMode.Complete).
                 queryName("textStream").
                 start

启动 msgsStream 后,出现以下错误:

scala> val msgsStream = msgs.
     |                  writeStream.
     |                  format("console").
     |                  outputMode(OutputMode.Complete).
     |                  queryName("textStream").
     |                  start
18/01/20 13:07:16 INFO StreamExecution: Starting textStream [id = 5c78ce99-cfb1-4d23-89e8-7bc59bd29f74, runId = 1a64f570-2871-4d6e-bbcd-7afdb2cac135]. Use /tmp/temporary-3b1bf0dc-72cf-439e-b499-ecfc802abe2e to store the query checkpoint.
msgsStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@89537c1

scala> 18/01/20 13:07:16 INFO FileStreamSourceLog: Set the compact interval to 10 [defaultCompactInterval: 10]
18/01/20 13:07:16 INFO FileStreamSource: maxFilesPerBatch = None, maxFileAgeMs = 604800000
18/01/20 13:07:16 INFO SessionState: Created local directory: /tmp/1231fb5e-6bba-4c1d-a013-97d5a3bce3ac_resources
18/01/20 13:07:16 INFO SessionState: Created HDFS directory: /tmp/hive/kleysonr/1231fb5e-6bba-4c1d-a013-97d5a3bce3ac
18/01/20 13:07:16 INFO SessionState: Created local directory: /tmp/kleysonr/1231fb5e-6bba-4c1d-a013-97d5a3bce3ac
18/01/20 13:07:16 INFO SessionState: Created HDFS directory: /tmp/hive/kleysonr/1231fb5e-6bba-4c1d-a013-97d5a3bce3ac/_tmp_space.db
18/01/20 13:07:16 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/mnt/storage/softwares/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/20 13:07:16 INFO StreamExecution: Starting new streaming query.
18/01/20 13:07:16 INFO FileStreamSource: Log offset set to 0 with 1 new files
18/01/20 13:07:16 INFO StreamExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1516460836287,Map(spark.sql.shuffle.partitions -> 200))
18/01/20 13:07:16 INFO FileStreamSource: Processing 1 files from 0:0
18/01/20 13:07:16 ERROR StreamExecution: Query textStream [id = 5c78ce99-cfb1-4d23-89e8-7bc59bd29f74, runId = 1a64f570-2871-4d6e-bbcd-7afdb2cac135] terminated with error
java.lang.IllegalArgumentException: Option 'basePath' must be a directory
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.basePaths(PartitioningAwareFileIndex.scala:221)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:70)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:134)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:353)
    at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:174)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:610)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:610)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:610)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:609)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)

我正在使用 Spark 版本:spark-2.2.1-bin-hadoop2.7 并使用以下命令启动 spark-shell:

bin/spark-shell --driver-memory 2g --executor-memory 2g --driver-cores 1 --executor-cores 1

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    就像例外所说的那样。结构化流中的路径必须是目录而不是文件。

    如果输入文件放在/tmp 中(这在实践中显然行不通,因为/tmp 是共享的)

    load("/tmp/")
    

    但通常你应该有一个单独的目录:

    load("/tmp/my_messages/")
    

    【讨论】:

      【解决方案2】:
      load("path/filename*.csv")
      

      这是我的答案,成功了!

      【讨论】:

      • 是的,我只是在文件名的末尾附加了一个“*”,这也有效。
      猜你喜欢
      • 2022-10-21
      • 1970-01-01
      • 2018-03-31
      • 2016-05-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-07
      相关资源
      最近更新 更多