【问题标题】:Spark - Restore nested saved RDDSpark - 恢复嵌套保存的RDD
【发布时间】:2017-12-29 02:18:49
【问题描述】:

我使用 AWS S3 作为进入 Spark 集群的数据的备份存储。数据每秒输入一次,并在读取 10 秒数据时进行处理。包含 10 秒数据的 RDD 使用

存储到 S3
rdd.saveAsObjectFile(s3URL + dateFormat.format(new Date()));

这意味着我们每天都会有很多文件以

的格式添加到 S3

S3URL/2017/07/23/12/00/10、S3URL/2017/07/23/12/00/20 等

从这里很容易恢复RDD,这是一个

JavaRDD

使用任一

sc.objectFile 或 AmazonS3 API

问题在于,为了减少迭代所需的文件数量,我们每天运行一个每天遍历每个文件的 cron 作业,将数据捆绑在一起并将新的 RDD 存储到 S3。这样做如下:

List<byte[]> dataList = new ArrayList<>(); // A list of all read messages
    /* Get all messages from S3 and store them in the above list */
    try {
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName("bucketname").withPrefix("logs/" + dateString);
        ListObjectsV2Result result;
        do {               
           result = s3Client.listObjectsV2(req);
           for (S3ObjectSummary objectSummary : 
               result.getObjectSummaries()) {
               System.out.println(" - " + objectSummary.getKey() + "  " +
                       "(size = " + objectSummary.getSize() + 
                       ")");
               if(objectSummary.getKey().contains("part-00000")){ // The messages are stored in files named "part-00000"
                   S3Object object = s3Client.getObject(
                           new GetObjectRequest(objectSummary.getBucketName(), objectSummary.getKey()));
                   InputStream objectData = object.getObjectContent();
                   byte[] byteData = new byte[(int) objectSummary.getSize()]; // The size of the messages differ
                   objectData.read(byteData);
                   dataList.add(byteData); // Add the message to the list
                   objectData.close();
               }
           }
           /* When iterating, messages are split into chunks called continuation tokens.
            * All tokens have to be iterated through to get all messages. */
           System.out.println("Next Continuation Token : " + result.getNextContinuationToken());
           req.setContinuationToken(result.getNextContinuationToken());
        } while(result.isTruncated() == true ); 
     } catch (AmazonServiceException ase) {
        System.out.println("Caught an AmazonServiceException, " +
                "which means your request made it " +
                "to Amazon S3, but was rejected with an error response " +
                "for some reason.");
        System.out.println("Error Message:    " + ase.getMessage());
        System.out.println("HTTP Status Code: " + ase.getStatusCode());
        System.out.println("AWS Error Code:   " + ase.getErrorCode());
        System.out.println("Error Type:       " + ase.getErrorType());
        System.out.println("Request ID:       " + ase.getRequestId());
    } catch (AmazonClientException ace) {
        System.out.println("Caught an AmazonClientException, " +
                "which means the client encountered " +
                "an internal error while trying to communicate" +
                " with S3, " +
                "such as not being able to access the network.");
        System.out.println("Error Message: " + ace.getMessage());
    } catch (IOException e) {
        e.printStackTrace();
    }
    JavaRDD<byte[]> messages = sc.parallelize(dataList); // Loads the messages into an RDD
    messages.saveAsObjectFile("S3URL/daily_logs/" + dateString);

这一切都很好,但现在我不确定如何将数据实际恢复到可管理的状态。如果我使用

sc.objectFile

为了恢复 RDD,我最终得到了一个 JavaRDD,其中 byte[] 实际上本身就是一个 JavaRDD。如何从位于 JavaRDD 中的 byte[] 恢复嵌套的 JavaRDD?

我希望这是有道理的,我很感激任何帮助。在最坏的情况下,我必须想出另一种方法来备份数据。

最好的问候 马蒂亚斯

【问题讨论】:

    标签: java apache-spark amazon-s3 rdd


    【解决方案1】:

    我没有存储嵌套的 RDD,而是将所有 byte[] 平面映射到单个 JavaRDD 并存储那个来解决这个问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-07-12
      • 1970-01-01
      • 2016-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多