【问题标题】:Using AWS Java S3 SDK TransferManager to resume an upload from a SFTP stream使用 AWS Java S3 SDK TransferManager 从 SFTP 流恢复上传
【发布时间】:2019-12-04 22:05:27
【问题描述】:

目前,我正在使用 Java 的 S3 SDK 中的 AWS 的 TransferManager 触发从 SFTP 服务器到 S3 的上传。我触发此上传的方式如下:

(伪代码...)

    @Autowired
    TransferManager transferManager;

    @Autowired
    SftpStreamFactory sftpStreamFactory;

    SftpStream sftpStream = sftpStreamFactory.createStream(filePath);
    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentLength(sftpStream.getSizeBytes());
    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, sftpStream.getStream(), objectMetadata);
    putObjectRequest.setGeneralProgressListener(new UploadBeginEndNotificationListener(uploadRequest, statusNotifier));
    
    transferManager.upload(putObjectRequest);

这是SftpStream的定义:

@AllArgsConstructor
public class SftpStreamFactory {

@Getter
@AllArgsConstructor
public static class SftpStream {
    private final long sizeBytes;
    private final InputStream stream;
}

private final SftpRemoteFileTemplate sftpTemplate;
private final SftpProperties sftpProperties;

public SftpStream createStream(Path relativePath) {
    return sftpTemplate.<SftpStream, ChannelSftp>executeWithClient(session -> createStream(session, relativePath));
}

SftpStream createStream(ChannelSftp channelSftp, Path relativePath) {

    String path = sftpProperties.getRoot().resolve(relativePath).toString();

    try {
        SftpATTRS fileAttrs = channelSftp.lstat(path);
        long size = fileAttrs.getSize();
        return new SftpStream(size, channelSftp.get(path));
    }
    catch (SftpException e) {
        throw new UncheckedIOException(new NestedIOException("SFTP Error", e));
    }
}

}

这种上传方式效果很好。但是,如果分段上传在中间被暂停/取消/以其他方式中止,我们希望从上次中断的地方继续,而不是重新开始。我们知道 TransferManagers resumeUpload 方法采用 PersistableUpload

然而,在 PersistableUpload 的 javadoc 中,它期望在构造函数中传递一个 file 路径,然后尝试从中创建一个 File 对象: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/PersistableUpload.html

我们想知道的是,是否有在没有这个文件对象的情况下恢复上传,我们无法从我们的ChannelSftp 获得?也就是说,我们可以从流而不是文件恢复上传吗?或者我们是否必须切换到使用低级 s3 api 来执行这样的恢复。任何建议表示赞赏。

编辑 - 查看更多内容,甚至为已经存在的上传传递一个 UploadId,如果没有文件,doUpload 方法将抛出异常。有什么想法吗?

【问题讨论】:

标签: java amazon-s3 spring-integration-sftp awss3transfermanager s3transfermanager


【解决方案1】:

答案是,不,没有文件就无法恢复上传,但是对于你的类似情况,有一个解决方法:

#在中止前暂停

  1. 当且仅当,在任何中断发生之前,尝试暂停连接
boolean forceCancel = true;
PauseResult<PersistableUpload> pauseResult = myUpload.tryPause(forceCancel);
  1. info to resume数据保存到文件
PersistableUpload persistableUpload = pauseResult.getInfoToResume();

File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE"); //blob
if (!f.exists())
    f.createNewFile();
FileOutputStream fos = new FileOutputStream(f);

// Serialize the persistable upload to the file.
persistableUpload.serialize(fos);
fos.close();
  1. 有时稍后再继续
TransferManager tm = new TransferManager();
FileInputStream fis = new FileInputStream(new File("UNIQUE-ID-FOR-UPLOADED-FILE"));

// Deserialize PersistableUpload information from disk.
PersistableUpload persistableUpload = PersistableTransfer.deserializeFrom(fis);

// Call resumeUpload with PersistableUpload.
tm.resumeUpload(persistableUpload);

fis.close();

#SAVE STREAM TO MAINTAIN FILE 如果发生某些事情(例如 JVM 崩溃)

使用S3SyncProgressListenerTransferManager#upload 来持久化每个更改并将数据序列化到磁盘。

transferManager.upload(putObjectRequest, new S3SyncProgressListener() {

    ExecutorService executor = Executors.newFixedThreadPool(1);

    @Override
    public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {

       executor.submit(new Runnable() {
          @Override
          public void run() {
              try {
                  File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE");
                  if (!f.exists()) {
                      f.createNewFile();
                  }
                  FileOutputStream fos = new FileOutputStream(f);
                  persistableTransfer.serialize(fos);
                  fos.close();
              } catch (IOException e) {
                  throw new RuntimeException("Unable to persist transfer to disk.", e);
              }
          }
       });
    }
});

希望对你有帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-21
    • 1970-01-01
    • 2020-01-08
    • 2019-04-07
    • 2020-04-18
    • 2021-10-27
    相关资源
    最近更新 更多