【问题标题】:Apache spark filtering files for processing in AWS S3用于在 AWS S3 中处理的 Apache Spark 过滤文件
【发布时间】:2018-03-06 05:52:30
【问题描述】:

我有一个任务,我正在接收应该用于处理的文件列表(每个文件的大小非常小)。 我的 AWS S3 存储桶中存储了数百万个此类文件,我只需要过滤和处理上述列表中存在的那些文件。

谁能告诉我在 Spark 中执行此操作的最佳实践?

例如。 XYZ 大学的 AWS S3 存储桶中存在数百万个文件。每个文件都有一个唯一的 ID 作为文件名。我得到了要处理的 1000 个唯一 ID 的列表。现在我只需要对这些文件进行处理以聚合并生成输出 csv 文件。

【问题讨论】:

标签: apache-spark amazon-s3 apache-spark-sql


【解决方案1】:

浏览了一段时间后,我明白了以下内容。

    String[] list={"s3a://path1/file1","s3a://path1/file2" ...}
    JavaRDD<String> readRDD=spark.read.json(list);

从 S3 读取对象,将其视为 HDFS 文件系统,从而降低性能。 因此,如果 S3 是源,那么下面的代码将在很大程度上优化性能,因为我们将使用 AWS 开发工具包从 S3 获取对象,然后创建相同的 RDD。

    String[] list={"file1","file2" ...};
    JavaRDD<String> readRDD=sc.parallelize(Arrays.asList(list))
            .map(file->{
                AmazonS3Client s3client= new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
                BufferedReader reader = new BufferedReader(new InputStreamReader(s3client.getObject("Bucket-Name", file).getObjectContent()));
                String line;
                StringBuilder sb=new StringBuilder();
                while((line=reader.readLine())!=null) {
                    sb.append(line);
                }
                return sb.toString();
            });

【讨论】:

    【解决方案2】:

    提供文件路径的逗号分隔列表 例如如果这些是 json 文件

    spark.read.json("s3a://path1/file1","s3a://path1/file2" ...)
    

    【讨论】:

    • 如果文件列表从 1000 增加到 1m,这会是一个很好的解决方案性能吗?
    • 如果您的列表很大,您可以使用 glob 语法(如果有模式)。如果文件数量接近所有文件,您可能会稍后过滤数据,否则您无论如何都需要打开所有文件。,
    猜你喜欢
    • 1970-01-01
    • 2020-01-01
    • 2018-09-30
    • 2017-01-10
    • 1970-01-01
    • 2022-01-05
    • 2015-10-13
    • 2016-11-17
    • 2018-03-19
    相关资源
    最近更新 更多