【问题标题】:Reading multiple files from S3 in parallel (Spark, Java)从 S3 并行读取多个文件(Spark、Java)
【发布时间】:2017-04-25 01:36:43
【问题描述】:

我看到了一些关于此的讨论,但不太了解正确的解决方案: 我想将几百个文件从 S3 加载到 RDD 中。这是我现在的做法:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

ReadFromS3Function 使用AmazonS3 客户端进行实际读取:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

我从我在 Scala 中看到的相同问题的答案中“翻译”了这一点。我认为也可以将整个路径列表传递给sc.textFile(...),但我不确定哪个是最佳实践方式。

【问题讨论】:

    标签: java apache-spark amazon-s3


    【解决方案1】:

    潜在的问题是在 s3 中列出对象真的很慢,而且它看起来像目录树的方式会在执行 treewalk 时降低性能(就像路径的通配符模式处理一样)。

    帖子中的代码正在执行所有子列表,它提供了更好的性能,它本质上是 Hadoop 2.8 和 s3a listFiles(path, recursive) 附带的,请参阅HADOOP-13208

    获得该列表后,您将获得对象路径的字符串,然后您可以将其映射到 s3a/s3n 路径,以便 spark 将其作为文本文件输入进行处理,然后您可以对其应用工作

    val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
    sc.textFile(files).map(...)
    

    根据要求,这是使用的 java 代码。

    String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
    objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
    // repeat while objectListing truncated 
    JavaRDD<String> events = sc.textFile(String.join(",", keys))
    

    请注意,我将 s3n 切换到 s3a,因为如果您的 CP 上有 hadoop-awsamazon-sdk JAR,那么您应该使用 s3a 连接器。它更好,它是由人们(我)针对火花工作负载进行维护和测试的。见The history of Hadoop's S3 connectors

    【讨论】:

    • 感谢史蒂夫的回答,我正在尝试使用两种选项在 AWS EMR 上运行代码(使用我的自定义地图功能并将所有路径传递给 textFile(...),但我遇到了一些麻烦它可以正常运行。一旦我设法运行它并比较性能,我将更新这个线程。
    • 可爱!我尝试了 7 个 1GiB 的文件,使用 textFile(...) 效果很好(比我的自定义代码快 50%)。那么您能否用相应的 Java 代码更新您的回复,我会接受吗? String prefix = "s3n://" + properties.get("s3.source.bucket") + "/"; objectListing.getObjectSummaries().forEach(summery -&gt; keys.add(prefix+summery.getKey())); // repeat while truncatedJavaRDD&lt;String&gt; events = sc.textFile(String.join(",", keys));
    • 用 s3a 试过,对我来说很好用。需要 --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 作为 spark-submit 的参数,我认为代码中还有 sc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");。并且 EMR 应该具有 IAM 角色,该角色具有从存储桶读取/写入存储桶的权限。这样你就可以AmazonS3 s3 = new AmazonS3Client(); 并且信用将被自动拾取。
    • 如果有大量密钥,例如数十亿,这将不起作用
    【解决方案2】:

    您可以使用sc.textFile 读取多个文件。

    你可以传递multiple file url with 作为它的参数。

    您可以指定整个 directories,使用 wildcards 甚至 CSV 的目录和通配符。

    例如:

    sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
    

    Reference from this ans

    【讨论】:

    • 使用通配符的问题在于,它会导致多次调用 list(),这会导致 spark-job 长时间没有响应。这就是为什么建议首先获取所有密钥然后并行化它们的原因。见这里:tech.kinja.com/…
    【解决方案3】:

    我猜如果您在阅读 aws 时尝试并行化将使用执行器并肯定会提高性能

    val bucketName=xxx
    val keyname=xxx
    val df=sc.parallelize(new AmazonS3Client(new BasicAWSCredentials("awsccessKeyId", "SecretKey")).listObjects(request).getObjectSummaries.map(_.getKey).toList)
            .flatMap { key => Source.fromInputStream(s3.getObject(bucketName, keyname).getObjectContent: InputStream).getLines }
    

    【讨论】:

    • 这看起来不错。 listObjects 通常会分批返回结果,所以我假设会有一个循环。
    • 是的,这段代码需要更多的逻辑来处理被截断的列表。
    猜你喜欢
    • 1970-01-01
    • 2021-04-27
    • 2016-12-04
    • 2023-01-24
    • 2017-12-23
    • 1970-01-01
    • 2019-10-27
    • 1970-01-01
    • 2020-03-13
    相关资源
    最近更新 更多