【问题标题】:How can I stream a massive object directly to S3, without a MemoryStream or local file?如何在没有 MemoryStream 或本地文件的情况下将大量对象直接流式传输到 S3?
【发布时间】:2021-07-07 21:02:21
【问题描述】:

我正在尝试将一个大型对象写入 AWS S3(例如 25 GB)。

目前我可以通过两种方式让它工作:

  • 将内容写入本地磁盘上的文件,然后使用multi-part upload将文件发送到S3
  • 将内容写入MemoryStream,然后使用分段上传将该流发送到 S3

但是,我不喜欢这两种方法,因为我需要为操作保留大量磁盘空间或内存。我在代码中生成这个内容,所以我希望打开一个到 S3 对象的流,并将内容直接生成到该对象。但我不知道如何让它发挥作用。

是否可以在不先将整个对象表示在本地文件或内存中的情况下在 S3 中构建一个海量对象?

(注意:我的问题与this question 非常相似,但该问题没有有用的答案。)

【问题讨论】:

  • 打开一个流到大量数据,从流中读取一小块数据,将小块写入另一个流,重复。如果您想要轻松的生活,可能一次只能读/写一个字节; IO 系统的其余部分将缓冲数据,因此您无需对自己的 c# 字节 [] 缓冲区感到时髦
  • @CaiusJard 根据您的评论,我得到了一些工作。我发布了我的解决方案作为这个问题的答案。这对你来说是正确的吗? (从概念上讲,你所说的非常有道理;我只是想知道如何实现它。)

标签: c# .net amazon-web-services amazon-s3 aws-sdk


【解决方案1】:

我能够通过将整个有效负载分成块并将每个单独的块作为单独的 MemoryStream 发送来使其工作。

从技术上讲,这个解决方案仍然使用MemoryStream,但这没关系,因为我可以通过调整块大小来控制使用多少内存。对于我的测试,我创建了一个 25GB 的文件,同时将内存使用量保持在远低于该值 (~2 GB IIRC)。

这是我的解决方案:

private const string BucketName = "YOUR-BUCKET-NAME-HERE";
private static readonly RegionEndpoint BucketRegion = RegionEndpoint.USEast1;
private const string Key = "massive-file-test";

// We're going to send 100 chunks of 256 MB each, for a total of 25 GB.
// The content will be the asterisk ("*") repeated for the desired size.
private const int ChunkSizeMb = 256;
private const int TotalSizeGb = 25;

public static void Main(string[] args)
{
    Console.WriteLine($"Writing object to {BucketName}, {Key}");

    int totalChunks = TotalSizeGb * 1024 / ChunkSizeMb;
    int chunkSizeBytes = ChunkSizeMb * 1024 * 1024;
    string payload = new String('*', chunkSizeBytes);

    // Initiate the request.
    InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest
    {
        BucketName = BucketName,
        Key = Key
    };

    List<UploadPartResponse> uploadResponses = new List<UploadPartResponse>();
    IAmazonS3 s3Client = new AmazonS3Client(BucketRegion);
    InitiateMultipartUploadResponse initResponse = s3Client.InitiateMultipartUpload(initiateRequest);

    // Open a stream to build the input.
    for (int i = 0; i < totalChunks; i++)
    {
        // Write the next chunk to the input stream.
        Console.WriteLine($"Writing chunk {i} of {totalChunks}");
        using (var stream = ToStream(payload))
        {
            // Write the next chunk to s3.
            UploadPartRequest uploadRequest = new UploadPartRequest
            {
                BucketName = BucketName,
                Key = Key,
                UploadId = initResponse.UploadId,
                PartNumber = i + 1,
                PartSize = chunkSizeBytes,
                InputStream = stream,
            };

            uploadResponses.Add(s3Client.UploadPart(uploadRequest));
        }
    }

    // Complete the request.
    CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest
    {
        BucketName = BucketName,
        Key = Key,
        UploadId = initResponse.UploadId
    };

    completeRequest.AddPartETags(uploadResponses);
    s3Client.CompleteMultipartUpload(completeRequest);

    Console.WriteLine("Script is complete. Press any key to exit...");
    Console.ReadKey();
}

private static Stream ToStream(string s)
{
    var stream = new MemoryStream();
    var writer = new StreamWriter(stream);
    writer.Write(s);
    writer.Flush();
    stream.Position = 0;
    return stream;
}

【讨论】:

  • 即便如此,这也意味着您必须创建一个 256 mb 的字符串,然后将其复制到内存流中,如果您有多个这样的字符串,那么它会无缘无故地消耗大量内存;编写自己的流
  • 哦.. 我看到 AnonCoward 采用了这种方法,但没有完成流类 - 只需添加缺少的方法/属性以使其可搜索
【解决方案2】:

这是 AnonCoward 开始的,通过添加 seek 结束 - 对于一个除了将星号写入其缓冲区之外什么都不做的流来说,这是一个微不足道的操作。如果您要生成更复杂的数据,这将是一项艰巨的工作,但要寻找所有您需要做的事情就是设置位置并说“是的,完成了”,因为无论您在流中的哪个位置寻找创建星号的行为始终是一样的

class AsteriskGeneratingStream : Stream
{
    long _pos = 0;
    long _length = 0;
    public AsteriskGeneratingStream(long length)
    {
        _length = length;
    }
    public override long Length => _length;

    public override int Read(byte[] buffer, int offset, int count)
    {
        // Create the data as needed
        if (count + _pos > _length)
            count = (int)(_length - _pos);
        
        for (int i = offset; i < count; i++)
            buffer[i] = (byte)'*';
        
        _pos += count;
        return count;
    }

    public override bool CanRead => true;

    public override long Seek(long offset, SeekOrigin origin)
    { 
        if(origin == SeekOrigin.Begin) //lets just trust that the caller will be sensible and not set e.g. negative offset
            _pos = offset;
        else if(origin == SeekOrigin.Current)
            _pos += offset;
        else if(origin == SeekOrigin.End)
            _pos = _length + offset;

        return _pos;
    }

    public override bool CanSeek => true;
    public override bool CanWrite => false;
    public override long Position { get => _pos; set => _pos = value; }
    public override void Flush() { }
    public override void SetLength(long value) { _length = value; }
    public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); }
}

class Program
{
    static void Main(string[] args)
    {
        long objectSize = 25L * 1024 * 1024;
        var s3 = new AmazonS3Client(Amazon.RegionEndpoint.USWest1);
        var xfer = new TransferUtility(s3,new TransferUtilityConfig
        {
            MinSizeBeforePartUpload = 5L * 1024 * 1024
        });

        var helper = new AsteriskGeneratingStream(objectSize);
        xfer.Upload(helper, "bucket-name", "object-key");
    }
}

注意,我不能保证它会立即工作,因为我在手机上,无法通过 c# fiddle 测试它,但让我们看看它是如何爆炸的! ?

【讨论】:

  • 谢谢,这很有帮助。当然,我的实际用例比星号示例更复杂。我不知道前面的内容长度,所以我不清楚如何实现可搜索的 Stream。我会对此进行一些研究,也许会提出一个新问题。
  • 如果它与数据相关,你只需要预先知道长度,如果 S3 甚至会调用寻找使用 End 作为参考。虽然我不太清楚你怎么能如果您不知道 X,希望上传大小为 X 的文件 - 系统中某处的某个人/事物肯定知道 X
【解决方案3】:

如果您可以动态创建对象,或者至少缓存相当小的段,则可以创建一个流,将数据提供到 S3。请注意,除非您还可以乱序创建对象的任何部分,否则您需要防止 AWS SDK 使用分段上传,这会降低传输速度。

    class DataStream : Stream
    {
        long _pos = 0;
        long _length = 0;
        public DataStream(long length)
        {
            _length = length;
        }
        public override long Length => _length;

        public override int Read(byte[] buffer, int offset, int count)
        {
            // Create the data as needed, on demand
            // For this example, just cycle through 0 to 256 in the data over and over again
            if (count + _pos > _length)
            {
                count = (int)(_length - _pos);
            }
            for (int i = 0; i < count; i++)
            {
                buffer[i + offset] = (byte)((_pos + i) % 256);
            }
            _pos += count;
            return count;
        }

        public override bool CanRead => true;

        // Stub out all other methods.  For a seekable stream
        // Seek() and Postion need to be implemented, along with CanSeek changed
        public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
        public override bool CanSeek => false;
        public override bool CanWrite => false;
        public override long Position { get => _pos; set => throw new NotImplementedException(); }
        public override void Flush() { throw new NotImplementedException(); }
        public override void SetLength(long value) { throw new NotImplementedException(); }
        public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); }
    }

    class Program
    {
        static void Main(string[] args)
        {
            long objectSize = 25L * 1024 * 1024;
            var s3 = new AmazonS3Client(Amazon.RegionEndpoint.USWest1);
            // Prevent a multi-part upload, which requires a seekable stream
            var xfer = new TransferUtility(s3, new TransferUtilityConfig
            {
                MinSizeBeforePartUpload = objectSize + 1
            });

            var helper = new DataStream(objectSize);
            xfer.Upload(helper, "bucket-name", "object-key");
        }
    }

【讨论】:

  • 谢谢,这个解决方案看起来很有希望,但不幸的是对我不起作用。当我尝试创建一个 25 GB 的文件时,我得到一个 S3Exception Your proposed upload exceeds the maximum allowed size。看起来每次上传最多 5GB。当我调整 MinSizeBeforePartUpload 以保持在 5GB 限制以下(总体仍为 25GB)时,我得到了另一个错误 System.InvalidOperationException: 'Base stream of PartialWrapperStream must be seekable'
  • 是的,我在发布后不久就意识到它不适用于超过 5gb 的大小。正如我所说,您需要将查找相关的方法添加到您的流实现中,并且能够处理任意查找。最重要的是,如果部分上传失败,您还需要支持寻求后退。所以,除非你的流创建方法支持我在示例中的琐碎案例,否则它可能不可行。
  • 我会把答案留在这里,以防它对其他人有用,但希望有人能为你想出一个更简单的案例。
  • 是的,这就是我要采取的方法。 @srk 只是在流上实现搜索 - 这并不难 - 当被告知要搜索时,只需为传入的任何来源相应地设置 _pos
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-19
  • 2023-02-23
  • 2019-11-12
  • 1970-01-01
  • 2015-06-22
  • 1970-01-01
  • 2020-10-12
相关资源
最近更新 更多