【问题标题】:Connect to S3 data from PySpark从 PySpark 连接到 S3 数据
【发布时间】:2015-11-16 07:14:15
【问题描述】:

我正在尝试从 Amazon s3 读取 JSON 文件,以创建 Spark 上下文并使用它来处理数据。

Spark 基本上是在一个 docker 容器中。所以把文件放到 docker 路径也是 PITA。因此将其推送到 S3。

下面的代码解释了其余的东西。

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)

config_dict = {"fs.s3n.awsAccessKeyId":"**",
               "fs.s3n.awsSecretAccessKey":"**"}

bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
                    'org.apache.hadoop.mapred.TextInputFormat',
                    'org.apache.hadoop.io.Text',
                    'org.apache.hadoop.io.LongWritable',
                    conf=config_dict)

我收到以下错误 -

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-b94543fb0e8e> in <module>()
      9                     'org.apache.hadoop.io.Text',
     10                     'org.apache.hadoop.io.LongWritable',
---> 11                     conf=config_dict)
     12 

/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
    558         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
    559                                               valueClass, keyConverter, valueConverter,
--> 560                                               jconf, batchSize)
    561         return RDD(jrdd, self)
    562 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
    at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
    at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:744)

我已经明确提供了 aswSecretAccessKey 和 awsAccessId。怎么了?

【问题讨论】:

    标签: python hadoop amazon-s3 apache-spark pyspark


    【解决方案1】:

    我已经解决了将--packages org.apache.hadoop:hadoop-aws:2.7.1 添加到 spark-submit 命令的问题。

    它将下载所有 hadoop 缺失的包,这些包将允许您使用 S3 执行 spark 作业。

    然后在您的工作中,您需要设置您的 AWS 凭证,例如:

    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)
    

    关于设置凭据的其他选项是将它们定义到 spark/conf/spark-env 中:

    #!/usr/bin/env bash
    AWS_ACCESS_KEY_ID='xxxx'
    AWS_SECRET_ACCESS_KEY='xxxx'
    
    SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine
    SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g)
    SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node
    

    更多信息:

    【讨论】:

    • 如何从 s3 的文件夹中读取 parquet 文件? (pyspark)。尝试阅读镶木地板时,上面的代码对我不起作用
    • 您能说明一下您是如何尝试从 parquet 文件中读取数据的吗?
    • sc = SparkContext.getOrCreate() sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'A') sc._jsc.hadoopConfiguration().set("fs. s3n.awsSecretAccessKey", 's') sqlContext = SQLContext(sc) df2 = sqlContext.read.parquet(s3://path/to/folder)
    • 如我所见,您正在从 s3 路径而不是 s3n 读取。在这种情况下,您可以尝试使用 fs.s3.awsAccessKeyId 和 fs.s3.awsSecretAccessKey 设置您的密钥
    【解决方案2】:

    我建议通过这个link

    就我而言,我使用实例配置文件凭据来访问 s3 数据。

    实例配置文件凭证 – 在 EC2 实例上使用并交付 通过 Amazon EC2 元数据服务。适用于 Java 的 AWS 开发工具包使用 InstanceProfileCredentialsProvider 来加载这些凭据。

    注意

    仅在以下情况下使用实例配置文件凭据 未设置 AWS_CONTAINER_CREDENTIALS_RELATIVE_URI。看 EC2ContainerCredentialsProviderWrapper 了解更多信息。

    对于 pyspark,我使用设置来访问 s3 内容。

    def get_spark_context(app_name):
        # configure
        conf = pyspark.SparkConf()
    
        # init & return
        sc = pyspark.SparkContext.getOrCreate(conf=conf)
    
        # s3a config
        sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
                                          's3.eu-central-1.amazonaws.com')
        sc._jsc.hadoopConfiguration().set(
            'fs.s3a.aws.credentials.provider',
            'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
            'com.amazonaws.auth.profile.ProfileCredentialsProvider'
        )
    
        return pyspark.SQLContext(sparkContext=sc)
    
    

    更多关于火花上下文here

    请参考this 获取类型 S3 访问权限。

    【讨论】:

    • FWIW,S3A 连接器默认在其供应商列表中包含 EC2 IAM 凭据提供程序(列表中的最后一个是最慢的并且可以触发限制)。标准顺序是:URL 中的机密(错误;从最新版本中删除)、XML 或 JCEKS 文件中的 fs.s3a.secret 设置、环境变量、IAM 角色。 Spark-submit 还将查找 AWS_ env vars 并从中设置 s3n 和 s3a 键值。
    • @SteveLoughran 到目前为止,我一直在处理 20G 数据,没有任何问题。但是,很好的补充,注意节流。
    • 限制主要是关于 InstanceProfileCredentialsProvider 早期版本中的一个错误,其中每个实例都是唯一的;它已移至所有线程中的单例。否则:单个 JVM 中的每个文件系统客户端都在访问 AWS Auth 服务,这些服务与其他所有服务一样受到限制。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-08
    • 1970-01-01
    • 2018-02-02
    • 2017-03-06
    • 2023-03-22
    • 1970-01-01
    相关资源
    最近更新 更多