【发布时间】:2017-04-25 01:36:43
【问题描述】:
我看到了一些关于此的讨论,但不太了解正确的解决方案: 我想将几百个文件从 S3 加载到 RDD 中。这是我现在的做法:
ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
withBucketName(...).
withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()
JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));
ReadFromS3Function 使用AmazonS3 客户端进行实际读取:
public Iterator<String> call(String s) throws Exception {
AmazonS3 s3Client = getAmazonS3Client(properties);
S3Object object = s3Client.getObject(new GetObjectRequest(...));
InputStream is = object.getObjectContent();
List<String> lines = new LinkedList<>();
String str;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
if (is != null) {
while ((str = reader.readLine()) != null) {
lines.add(str);
}
} else {
...
}
} finally {
...
}
return lines.iterator();
我从我在 Scala 中看到的相同问题的答案中“翻译”了这一点。我认为也可以将整个路径列表传递给sc.textFile(...),但我不确定哪个是最佳实践方式。
【问题讨论】:
标签: java apache-spark amazon-s3