【问题标题】:Parallel batch file download from Amazon S3 using AWS S3 SDK for .NET使用适用于 .NET 的 AWS S3 开发工具包从 Amazon S3 并行下载批处理文件
【发布时间】:2012-05-16 05:46:40
【问题描述】:

问题:我想使用他们的 .NET SDK 从 AWS S3 并行下载 100 个文件。下载的内容应该存储在 100 个内存流中(文件足够小,我可以从那里获取)。我对 Task、IAsyncResult、Parallel.* 和 .NET 4.0 中的其他不同方法感到困惑。

如果我尝试自己解决问题,我脑海中浮现出类似这样的伪代码: (编辑为一些变量添加类型)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});

此代码并行启动 100 个请求,这很好。但是,有两个问题:

  1. 最后一条语句将串行下载文件,而不是并行下载。流中似乎没有 BeginCopyTo()/EndCopyTo() 方法...
  2. 在所有请求都响应之前,前面的语句不会放手。换句话说,在所有文件都开始之前,所有文件都不会开始下载。

所以我开始认为我走错了路……

帮助?

【问题讨论】:

    标签: c# .net c#-4.0 amazon-s3 amazon-web-services


    【解决方案1】:

    如果将操作分解为一个异步处理一个请求然后调用它100次的方法,这可能会更容易。

    首先,让我们确定您想要的最终结果。由于您将使用的是MemoryStream,这意味着您需要从您的方法中返回Task&lt;MemoryStream&gt;。签名将如下所示:

    static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
        GetObjectRequest request)
    

    因为您的AmazonS3 对象实现了Asynchronous Design Pattern,所以您可以使用TaskFactory class 上的FromAsync method 从实现异步设计模式的类中生成Task&lt;T&gt;,如下所示:

    static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
        GetObjectRequest request)
    {
        Task<GetObjectResponse> response = 
            Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
                s3.BeginGetObject, s3.EndGetObject, request, null);
    
        // But what goes here?
    

    所以你已经在一个好地方,你有一个Task&lt;T&gt;,你可以等待或在通话完成时得到回叫。但是,您需要以某种方式将从对Task&lt;GetObjectResponse&gt; 的调用返回的GetObjectResponse 转换为MemoryStream

    为此,您希望在 Task&lt;T&gt; 类上使用 ContinueWith method。将其视为Enumerable classSelect method 的异步版本,它只是对另一个Task&lt;T&gt; 的投影,除了每次调用ContinueWith 时,您可能会创建一个运行的新任务 部分代码。

    这样,您的方法如下所示:

    static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
        GetObjectRequest request)
    {
        // Start the task of downloading.
        Task<GetObjectResponse> response = 
            Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
                s3.BeginGetObject, s3.EndGetObject, request, null
            );
    
        // Translate.
        Task<MemoryStream> translation = response.ContinueWith(t => {
            using (Task<GetObjectResponse> resp = t ){
                var ms = new MemoryStream(); 
                t.Result.ResponseStream.CopyTo(ms); 
                return ms;
            } 
        });
    
        // Return the full task chain.
        return translation;
    }
    

    请注意,在上述情况下,您可以通过 TaskContinuationOptions.ExecuteSynchronously 调用 overload of ContinueWith,因为看起来您所做的工作很少(我无法确定,响应可能巨大)。如果您正在做的工作非常少,而为了完成工作而开始一项新任务是有害的,您应该传递TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。

    现在您有了可以将一个请求转换为Task&lt;MemoryStream&gt;的方法,创建一个可以处理任意个请求的包装器很简单:

    static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
        IEnumerable<GetObjectRequest> requests)
    {
        // Just call Select on the requests, passing our translation into
        // a Task<MemoryStream>.
        // Also, materialize here, so that the tasks are "hot" when
        // returned.
        return requests.Select(r => GetMemoryStreamAsync(s3, r)).
            ToArray();
    }
    

    在上面,您只需获取GetObjectRequest 实例的序列,它将返回Task&lt;MemoryStream&gt; 数组。它返回物化序列这一事实很重要。如果你在返回之前没有实现它,那么在序列被迭代之前不会创建任务。

    当然,如果您想要这种行为,那么无论如何,只需删除对.ToArray() 的调用,让方法返回IEnumerable&lt;Task&lt;MemoryStream&gt;&gt;,然后在您遍历任务时发出请求。

    从那里,您可以一次处理一个(在循环中使用Task.WaitAny method)或等待所有这些完成(通过调用Task.WaitAll method)。后者的一个例子是:

    static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
        IEnumerable<GetObjectRequest> requests)
    {
        Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
        Task.WaitAll(tasks);
        return tasks.Select(t => t.Result).ToList();
    }
    

    另外,应该提到的是,这非常适合Reactive Extensions framework,因为这个非常非常适合IObservable&lt;T&gt; 实现。

    【讨论】:

    • 这是一个很棒的解决方案,描述得非常好,并在我发布问题后大约 20 分钟内交付。我很开心。在我进行修复以添加更准确的 S3 类名称并指定更具体的 FromAsync() 方法后,它对我也很有效。 Casper,你想让我修改你的答案吗?
    • @DenNukem 哦,我没有解决从一个流异步复制到另一个流的问题。这将在 .NET 4.5 中可用,但需要一些 async/await 善良才能让它看起来不像火车残骸。现在,使用Stream.CopyTo method,但要知道在.NET 4.5 中,您可以使用Stream.CopyToAsyncasync/await 来更优雅地完成所有
    • @casperOne 我可以看到一个 .NET 4.5 实现方式的示例吗?
    • @user1265146 你收到了吗?
    猜你喜欢
    • 1970-01-01
    • 2015-08-09
    • 2021-12-09
    • 2018-09-06
    • 1970-01-01
    • 1970-01-01
    • 2020-06-05
    • 2021-04-11
    • 2011-11-10
    相关资源
    最近更新 更多