【问题标题】:how to write to storage google cloud with pyspark writestream?如何使用 pyspark writestream 写入存储谷歌云?
【发布时间】:2020-11-15 14:59:28
【问题描述】:

我尝试从 pyspark 流写入 GCP 存储。
这是代码:

df_test\
.writeStream.format("parquet")\
.option("path","gs://{my_bucketname}/test")\
.option("checkpointLocation", "gs://{my_checkpointBucket}/checkpoint")\
.start()\
.awaitTermination()

但我收到了这个错误:

20/11/15 16:37:59 WARN CheckpointFileManager: Could not use FileContext API 
for managing Structured Streaming checkpoint files at gs://name- 
bucket/test/_spark_metadata
 .Using FileSystem API instead for managing log files. If the implementation 
 of FileSystem.rename() is not atomic, then the correctness and fault- 
 tolerance ofyour Structured Streaming is not guaranteed.
 Traceback (most recent call last):
 File "testgcp.py", line 40, in <module>
 .option("checkpointLocation", "gs://check_point_bucket/checkpoint")\
 File "/home/naya/anaconda3/lib/python3.6/site- 
 packages/pyspark/sql/streaming.py", line 1105, in start
 return self._sq(self._jwrite.start())
 File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", 
 line 1257, in __call__
 answer, self.gateway_client, self.target_id, self.name)
 File "/home/naya/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py", 
 line 63, in deco
 return f(*a, **kw)
 File "/home/naya/anaconda3/lib/python3.6/site-packages/py4j/protocol.py", 
 line 328, in get_return_value
 format(target_id, ".", name), value)
 py4j.protocol.Py4JJavaError: An error occurred while calling o55.start.
 : java.io.IOException: No FileSystem for scheme: gs

正确的语法应该是什么?

【问题讨论】:

    标签: google-cloud-platform pyspark spark-streaming


    【解决方案1】:

    根据这个documentation。看来您首先需要使用正确的身份验证设置 spark.conf。

    spark.conf.set("google.cloud.auth.service.account.enable", "true")
    spark.conf.set("google.cloud.auth.service.account.email", "Your_service_email")
    spark.conf.set("google.cloud.auth.service.account.keyfile", "path/to/your/files")
    

    然后您可以使用读取功能访问存储桶中的文件。

    df = spark.read.option("header",True).csv("gs://bucket_name/path_to_your_file.csv")
    df.show() 
    

    【讨论】:

      猜你喜欢
      • 2016-07-12
      • 1970-01-01
      • 2017-09-26
      • 1970-01-01
      • 2015-09-02
      • 1970-01-01
      • 2020-01-25
      • 2013-06-26
      • 2020-09-10
      相关资源
      最近更新 更多