如果将操作分解为一个异步处理一个请求然后调用它100次的方法,这可能会更容易。
首先,让我们确定您想要的最终结果。由于您将使用的是MemoryStream,这意味着您需要从您的方法中返回Task<MemoryStream>。签名将如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
因为您的AmazonS3 对象实现了Asynchronous Design Pattern,所以您可以使用TaskFactory class 上的FromAsync method 从实现异步设计模式的类中生成Task<T>,如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
所以你已经在一个好地方,你有一个Task<T>,你可以等待或在通话完成时得到回叫。但是,您需要以某种方式将从对Task<GetObjectResponse> 的调用返回的GetObjectResponse 转换为MemoryStream。
为此,您希望在 Task<T> 类上使用 ContinueWith method。将其视为Enumerable class 上Select method 的异步版本,它只是对另一个Task<T> 的投影,除了每次调用ContinueWith 时,您可能会创建一个运行的新任务 部分代码。
这样,您的方法如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
请注意,在上述情况下,您可以通过 TaskContinuationOptions.ExecuteSynchronously 调用 overload of ContinueWith,因为看起来您所做的工作很少(我无法确定,响应可能巨大)。如果您正在做的工作非常少,而为了完成工作而开始一项新任务是有害的,您应该传递TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。
现在您有了可以将一个请求转换为Task<MemoryStream>的方法,创建一个可以处理任意个请求的包装器很简单:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
在上面,您只需获取GetObjectRequest 实例的序列,它将返回Task<MemoryStream> 数组。它返回物化序列这一事实很重要。如果你在返回之前没有实现它,那么在序列被迭代之前不会创建任务。
当然,如果您想要这种行为,那么无论如何,只需删除对.ToArray() 的调用,让方法返回IEnumerable<Task<MemoryStream>>,然后在您遍历任务时发出请求。
从那里,您可以一次处理一个(在循环中使用Task.WaitAny method)或等待所有这些完成(通过调用Task.WaitAll method)。后者的一个例子是:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
另外,应该提到的是,这非常适合Reactive Extensions framework,因为这个非常非常适合IObservable<T> 实现。