【问题标题】:StreamingQueryException: Option 'basePath' must be a directoryStreamingQueryException:选项 \'basePath\' 必须是目录
【发布时间】:2022-10-21 12:24:24
【问题描述】:

我写了这段代码,我得到了这个错误:StreamingQueryException:选项'basePath'必须是一个目录。我的目标是将流写入文件 csv sink。目录 output_path/ 和 checkpoint/ 已创建但为空。

pipe = Pipeline(stages=indexers)
pipe_model = pipe.fit(dataset)
dataset= pipe_model.transform(dataset)
pipe_model.save("pipe_model")

df = spark\
.readStream\
.option("header", "true")\
.schema(schema)\
.csv("KDDTrain+.txt")

model = PipelineModel.load("pipe_model")
dataset= model.transform(df)

q=dataset.writeStream\
 .format("csv")\
 .option("header", "true")\
 .option("format", "append") \
 .queryName("okk")\
 .trigger(processingTime="10 seconds")\
 .option("checkpointLocation", "checkpoint/")\
 .option("path", "output_path/")\
 .outputMode("append") \
 .start()

q.awaitTermination()

我收到了这个错误:

---------------------------------------------------------------------------
StreamingQueryException                   Traceback (most recent call last)
Input In [7], in <cell line: 1>()
----> 1 q.awaitTermination()

File /usr/local/spark/python/pyspark/sql/streaming.py:101, in StreamingQuery.awaitTermination(self, timeout)
    99     return self._jsq.awaitTermination(int(timeout * 1000))
    100 else:
--> 101     return self._jsq.awaitTermination()

File /usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py:1321, in 
JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:117, in capture_sql_exception.<locals>.deco(*a, **kw)
   113 converted = convert_exception(e.java_exception)
   114 if not isinstance(converted, UnknownException):
   115     # Hide where the exception came from that shows a non-Pythonic
   116     # JVM exception message.
--> 117     raise converted from None
   118 else:
   119     raise

StreamingQueryException: Option 'basePath' must be a directory === Streaming Query ===
Identifier: okk [id = a5a4ac3d-a533-409b-be0b-015ead8d2f4a, runId = 3e3b747d-ba4f 4948-bc9f-4ef360e08979]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[file:/home/jovyan/work/KDDTrain+.txt {"logOffset":0}} 

Current State: ACTIVE
Thread State: RUNNABLE

问题出在哪里?以及如何解决它?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql spark-streaming


    【解决方案1】:

    我有同样的问题。我真的不知道为什么会出现这个问题,我找到了有关 java 缺少依赖项的信息,安装它并导出了 JAVA_HOME 路径变量但仍然没有效果。我的简单代码

    def foreach_batch_function(df, epoch_id):
        df = pipeline.fit(df)
        df = pipeline.transform(df)
    
    
    streaming_query = booktitles_df
        .writeStream
        .trigger(processingTime="10 seconds")
        .queryName("TextVectorized")
        .foreachBatch(foreach_batch_function)
        .option("checkpointLocation", "checkpoint")
        .option("path", "/content/results/")
        .start()
    

    最后应用 awaitTermination() 的废墟。我不知道为什么。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-04-03
      • 2015-10-07
      • 2015-08-07
      • 1970-01-01
      • 1970-01-01
      • 2017-03-29
      • 2013-04-08
      相关资源
      最近更新 更多