【发布时间】:2020-11-15 14:59:28
【问题描述】:
我尝试从 pyspark 流写入 GCP 存储。
这是代码:
df_test\
.writeStream.format("parquet")\
.option("path","gs://{my_bucketname}/test")\
.option("checkpointLocation", "gs://{my_checkpointBucket}/checkpoint")\
.start()\
.awaitTermination()
但我收到了这个错误:
20/11/15 16:37:59 WARN CheckpointFileManager: Could not use FileContext API
for managing Structured Streaming checkpoint files at gs://name-
bucket/test/_spark_metadata
.Using FileSystem API instead for managing log files. If the implementation
of FileSystem.rename() is not atomic, then the correctness and fault-
tolerance ofyour Structured Streaming is not guaranteed.
Traceback (most recent call last):
File "testgcp.py", line 40, in <module>
.option("checkpointLocation", "gs://check_point_bucket/checkpoint")\
File "/home/naya/anaconda3/lib/python3.6/site-
packages/pyspark/sql/streaming.py", line 1105, in start
return self._sq(self._jwrite.start())
File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py",
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/naya/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw)
File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o55.start.
: java.io.IOException: No FileSystem for scheme: gs
正确的语法应该是什么?
【问题讨论】:
标签: google-cloud-platform pyspark spark-streaming