【发布时间】:2017-12-29 02:18:49
【问题描述】:
我使用 AWS S3 作为进入 Spark 集群的数据的备份存储。数据每秒输入一次,并在读取 10 秒数据时进行处理。包含 10 秒数据的 RDD 使用
存储到 S3rdd.saveAsObjectFile(s3URL + dateFormat.format(new Date()));
这意味着我们每天都会有很多文件以
的格式添加到 S3S3URL/2017/07/23/12/00/10、S3URL/2017/07/23/12/00/20 等
从这里很容易恢复RDD,这是一个
JavaRDD
使用任一
sc.objectFile 或 AmazonS3 API
问题在于,为了减少迭代所需的文件数量,我们每天运行一个每天遍历每个文件的 cron 作业,将数据捆绑在一起并将新的 RDD 存储到 S3。这样做如下:
List<byte[]> dataList = new ArrayList<>(); // A list of all read messages
/* Get all messages from S3 and store them in the above list */
try {
final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName("bucketname").withPrefix("logs/" + dateString);
ListObjectsV2Result result;
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary :
result.getObjectSummaries()) {
System.out.println(" - " + objectSummary.getKey() + " " +
"(size = " + objectSummary.getSize() +
")");
if(objectSummary.getKey().contains("part-00000")){ // The messages are stored in files named "part-00000"
S3Object object = s3Client.getObject(
new GetObjectRequest(objectSummary.getBucketName(), objectSummary.getKey()));
InputStream objectData = object.getObjectContent();
byte[] byteData = new byte[(int) objectSummary.getSize()]; // The size of the messages differ
objectData.read(byteData);
dataList.add(byteData); // Add the message to the list
objectData.close();
}
}
/* When iterating, messages are split into chunks called continuation tokens.
* All tokens have to be iterated through to get all messages. */
System.out.println("Next Continuation Token : " + result.getNextContinuationToken());
req.setContinuationToken(result.getNextContinuationToken());
} while(result.isTruncated() == true );
} catch (AmazonServiceException ase) {
System.out.println("Caught an AmazonServiceException, " +
"which means your request made it " +
"to Amazon S3, but was rejected with an error response " +
"for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("Caught an AmazonClientException, " +
"which means the client encountered " +
"an internal error while trying to communicate" +
" with S3, " +
"such as not being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
} catch (IOException e) {
e.printStackTrace();
}
JavaRDD<byte[]> messages = sc.parallelize(dataList); // Loads the messages into an RDD
messages.saveAsObjectFile("S3URL/daily_logs/" + dateString);
这一切都很好,但现在我不确定如何将数据实际恢复到可管理的状态。如果我使用
sc.objectFile
为了恢复 RDD,我最终得到了一个 JavaRDD,其中 byte[] 实际上本身就是一个 JavaRDD。如何从位于 JavaRDD 中的 byte[] 恢复嵌套的 JavaRDD?
我希望这是有道理的,我很感激任何帮助。在最坏的情况下,我必须想出另一种方法来备份数据。
最好的问候 马蒂亚斯
【问题讨论】:
标签: java apache-spark amazon-s3 rdd