【问题标题】:No FileSystem for scheme: cos方案没有文件系统:cos
【发布时间】:2018-02-11 04:38:50
【问题描述】:

我正在尝试从 IBM Data Science Experience 连接到 IBM Cloud Object Storage:

access_key = 'XXX'
secret_key = 'XXX'
bucket = 'mybucket'
host = 'lon.ibmselect.objstor.com' 
service = 'mycos'

sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.myCos.access.key', access_key)
hconf.set('fs.cos.myCos.endpoint', 'http://' + host)
hconf.set('fs.cose.myCos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')

obj = 'mydata.tsv.gz'

rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())

这会返回:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: cos

我猜我需要使用基于 stocator docs 的“cos”方案。但是,错误提示 stocator 不可用或者是旧版本?

有什么想法吗?


更新 1:

我也尝试了以下方法:

sqlCxt = SQLContext(sc)
hconf = sc._jsc.hadoopConfiguration()
hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
hconf.set('fs.stocator.scheme.list', 'cos')
hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
hconf.set('fs.stocator.cos.scheme', 'cos')
hconf.set('fs.cos.mycos.access.key', access_key)
hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
hconf.set('fs.cos.mycos.secret.key', secret_key)
hconf.set('fs.cos.service.v2.signer.type', 'false')

service = 'mycos'
obj = 'mydata.tsv.gz'          
rdd = sc.textFile('cos://{0}.{1}/{2}'.format(bucket, service, obj))
print(rdd.count())

然而,这次的回应是:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No object store for: cos
    at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:121)
    ...
Caused by: java.lang.ClassNotFoundException: com.ibm.stocator.fs.cos.COSAPIClient

【问题讨论】:

    标签: pyspark data-science-experience ibm-cloud-storage stocator


    【解决方案1】:

    支持 fs.cos 方案的最新版 Stocator (v1.0.9) 尚未部署在 Spark aaService 上(即将推出)。请使用 stocator 方案“fs.s3d”连接到您的 COS。

    例子:

    endpoint = 'endpointXXX' 
    access_key = 'XXX'
    secret_key = 'XXX'
    
    prefix = "fs.s3d.service"
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + ".endpoint", endpoint)
    hconf.set(prefix + ".access.key", access_key)
    hconf.set(prefix + ".secret.key", secret_key)
    
    bucket = 'mybucket'
    obj = 'mydata.tsv.gz'
    
    rdd = sc.textFile('s3d://{0}.service/{1}'.format(bucket, obj))
    rdd.count()
    

    或者,您可以使用 ibmos2spark。该库已经安装在我们的服务上。示例:

    import ibmos2spark
    
    credentials = {
       'endpoint': 'endpointXXXX',
       'access_key': 'XXXX',
       'secret_key': 'XXXX'
    }
    
    configuration_name = 'os_configs' # any string you want
    cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name)
    
    bucket = 'mybucket'
    obj = 'mydata.tsv.gz'
    rdd = sc.textFile(cos.url(obj, bucket))
    rdd.count()
    

    【讨论】:

      【解决方案2】:

      Stocator 位于 Spark 2.0 和 2.1 内核的类路径中,但未配置 cos 方案。您可以通过在 Python 笔记本中执行以下命令来访问配置:

      !cat $SPARK_CONF_DIR/core-site.xml
      

      查找属性fs.stocator.scheme.list。我目前看到的是:

      <property>
          <name>fs.stocator.scheme.list</name>
          <value>swift2d,swift,s3d</value>
      </property>
      

      我建议您针对 DSX 提出功能请求以支持 cos 方案。

      【讨论】:

        【解决方案3】:

        看起来 cos 驱动程序没有正确初始化。试试这个配置:

        hconf.set('fs.cos.impl', 'com.ibm.stocator.fs.ObjectStoreFileSystem')
        
        hconf.set('fs.stocator.scheme.list', 'cos')
        hconf.set('fs.stocator.cos.impl', 'com.ibm.stocator.fs.cos.COSAPIClient')
        hconf.set('fs.stocator.cos.scheme', 'cos')
        
        hconf.set('fs.cos.mycos.access.key', access_key)
        hconf.set('fs.cos.mycos.endpoint', 'http://' + host)
        hconf.set('fs.cos.mycos.secret.key', secret_key)
        hconf.set('fs.cos.service.v2.signer.type', 'false')
        

        更新 1:

        您还需要确保 stocator 类位于类路径中。您可以通过以下方式执行pyspark来使用包系统:

        ./bin/pyspark --packages com.ibm.stocator:stocator:1.0.24
        

        这适用于 swift2dcos 方案。

        更新 2:

        只需遵循 Stocator 文档 (https://github.com/CODAIT/stocator)。它包含所有详细信息如何安装它,使用什么分支等。

        【讨论】:

        • 不幸的是,这不起作用。我已经更新了我的问题以反映这一点。
        • @ChrisSnow 是的,但现在不是配置问题,而是缺少库。请查看更新的答案。
        【解决方案4】:

        我发现了同样的问题,为了解决它,我只是改变了环境:

        在 IBM Watson Studio 中,如果您在没有预配置 Spark 集群的环境中启动 Jupyter Notebook,则会收到该错误。安装PySpark 是不够的。

        相反,如果您使用可用的 Spark 集群启动笔记本,就可以了。

        【讨论】:

          【解决方案5】:

          您必须设置.config("spark.hadoop.fs.stocator.scheme.list", "cos") 以及其他一些fs.cos... 配置。 这是一个可以工作的端到端 sn-p 代码示例(使用 pyspark==2.3.2Python 3.7.3 测试):

          from pyspark.sql import SparkSession
          
          stocator_jar = '/path/to/stocator-1.1.2-SNAPSHOT-IBM-SDK.jar'
          cos_instance_name = '<myCosIntanceName>'
          bucket_name = '<bucketName>'
          s3_region = '<region>'
          cos_iam_api_key = '*******'
          iam_servicce_id = 'crn:v1:bluemix:public:iam-identity::<****************>'
          
          spark_builder = (
              SparkSession
                  .builder
                  .appName('test_app'))
          
          spark_builder.config('spark.driver.extraClassPath', stocator_jar)
          spark_builder.config('spark.executor.extraClassPath', stocator_jar)
          spark_builder.config(f"fs.cos.{cos_instance_name}.iam.api.key", cos_iam_api_key)
          spark_builder.config(f"fs.cos.{cos_instance_name}.endpoint", f"s3.{s3_region}.cloud-object-storage.appdomain.cloud")
          spark_builder.config(f"fs.cos.{cos_instance_name}.iam.service.id", iam_servicce_id)
          spark_builder.config("spark.hadoop.fs.stocator.scheme.list", "cos")
          spark_builder.config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
          spark_builder.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
          spark_builder.config("fs.stocator.cos.scheme", "cos")
          
          spark_sess = spark_builder.getOrCreate()
          
          dataset = spark_sess.range(1, 10)
          dataset = dataset.withColumnRenamed('id', 'user_idx')
          
          dataset.repartition(1).write.csv(
              f'cos://{bucket_name}.{cos_instance_name}/test.csv',
              mode='overwrite',
              header=True)
          
          spark_sess.stop()
          print('done!')
          

          【讨论】:

            猜你喜欢
            • 2021-05-25
            • 1970-01-01
            • 2021-12-12
            • 2021-11-15
            • 2016-03-06
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多