【问题标题】:Returning async stream of query results返回查询结果的异步流
【发布时间】:2014-07-20 08:06:08
【问题描述】:

我有以下 WebApi 方法,它从 RavenDB 返回一个无限的结果流:

public IEnumerable<Foo> Get()
{
    var query = DocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = DocumentSession.Advanced.Stream(query))
        while (enumerator.MoveNext())
            yield return enumerator.Current.Document;
}

现在我想让它异步。天真的方法当然行不通:

public async Task<IEnumerable<Location>> Get()
{
    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        while (await enumerator.MoveNextAsync())
            yield return enumerator.Current.Document;
}

...因为方法不能既是异步又是迭代器。

【问题讨论】:

  • 您可以实现自己的迭代器。 但是,在迭代器上调用MoveNext 也必须是异步的——这意味着你不能实现IEnumerable&lt;T&gt;,你必须定义你自己的接口。而且您也无法在 foreach 循环中使用该迭代器。
  • 是的,所有这些限制都是正确的。由于我只是返回这个(将由 WebApi 序列化),我不需要很大的灵活性。也许实现一个理解 Task&lt;IAsyncEnumerator&lt;StreamResult&lt;T&gt;&gt;&gt; 的 MediaTypeFormatter
  • @noseratio,问题类似,但不是重复的。
  • @noseratio 建议的解决方案将适用。但是由于我使用的是Web Api,所以我有机会直接使用格式化程序来支持IAsyncEnumerator,所以我可以避免使用助手。使用 yield 是一种解决方案,而不是要求。

标签: c# asp.net-web-api async-await ravendb


【解决方案1】:

由于这是一种 WebAPI 操作方法,HTTP 将您限制为单个响应。如果你只返回一个IEnumerable&lt;T&gt;,那么 ASP.NET 会在内存中枚举它,然后发送响应。

如果你对这个内存进程没问题,那么你可以自己做同样的事情:

public async Task<List<Location>> Get()
{
  var result = new List<Location>();
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
    while (await enumerator.MoveNextAsync())
      result.Add(enumerator.Current.Document);
  return result;
}

但是,我认为使用流式响应会更好,您可以通过PushStreamContent 获得;像这样:

public HttpResponseMessage Get()
{
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  HttpResponseMessage response = Request.CreateResponse();
  response.Content = new PushStreamContent(
      async (stream, content, context) =>
      {
        using (stream)
        using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        {
          while (await enumerator.MoveNextAsync())
          {
            // TODO: adjust encoding as necessary.
            var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
            var data = UTF8Encoding.UTF8.GetBytes(serialized);
            var countPrefix = BitConverter.GetBytes(data.Length);
            await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
            await stream.WriteAsync(data, 0, data.Length);
          }
        }
      });
  return response;
}

流式响应不需要您的服务器将整个响应保存在内存中;但是,您必须决定将文档写入响应流的正确方法。上面的示例代码只是将它们转换为 JSON,以 UTF8 编码,以及(二进制)长度前缀这些字符串。

【讨论】:

  • 这实际上不是一个坏主意(PushStream 那个)。它甚至可能比我的好一点,它读取异步但写入同步。也许两者结合会很酷。
【解决方案2】:

您可以实现自己的迭代器,而不是让编译器为您生成一个。

但是,在该迭代器上调用 MoveNext 也必须是异步的 - 这意味着您无法实现 IEnumerable&lt;T&gt;`IEnumerator, you'd have to define your own interface, e.g.,IAsyncEnumerator`。 而且您也无法在 foreach 循环中使用该迭代器。

在我看来,最好的办法是做StreamAsync 所做的事情。创建一个自定义类型IAsyncEnumerable,它返回一个实现自定义async T MoveNextAsync() 方法的IAsyncEnumerator&lt;T&gt;。 enumerable 将包装您的 query 对象,并且 enumerator 将获取文档会话的文档。

internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>
{
    private readonly YourQueryType _query;
    public AsyncDocumentEnumerable(YourQueryType query)
    {
        _query = query;
    }

    IAsyncEnumerator<Document> GetEnumerator()
    {
        return new AsyncDocumentEnumerator(_query);
    }
}


internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>
{
    private readonly YourQueryType _query;
    private IAsyncEnumerator<DocumentSession> _iter;

    public AsyncDocumentEnumerator(YourQueryType query)
    {
        _query = query;
    }

    public Task<bool> async MoveNextAsync()
    {
        if(_iter == null)
            _iter = await AsyncDocumentSession.Advanced.StreamAsync(query);

        bool moved = await _iter.MoveNextAsync();

        if(moved)
            Current = _iter.Current.Document;
        return moved;
    }

    public Document Current{get; private set;}
}

【讨论】:

  • 那行不通。 MoveNext 必须返回 Task&lt;bool&gt;,而不是 bool,所以我在同一个地方。
  • @DiegoMijelshon 哎呀,我的错!我就是这个意思。
  • 仍然不是太有用 - 我仍然有一个 WebApi 不能使用的异步枚举/枚举器,就像以前一样,但被包装了。请参阅我自己的答案。
【解决方案3】:

毕竟这并不难。解决方案是一个可以异步处理枚举器并将 JSON 写入流的格式化程序:

public class CustomJsonMediaTypeFormatter : JsonMediaTypeFormatter
{
    public override async Task WriteToStreamAsync(
           Type type, object value, Stream writeStream, HttpContent content,
           TransportContext transportContext, CancellationToken cancellationToken)
    {
        if (type.IsGenericType &&
            type.GetGenericTypeDefinition() == typeof(IAsyncEnumerator<>))
        {
            var writer = new JsonTextWriter(new StreamWriter(writeStream))
                         { CloseOutput = false };
            writer.WriteStartArray();
            await Serialize((dynamic)value, writer);
            writer.WriteEndArray();
            writer.Flush();
        }
        else
            await base.WriteToStreamAsync(type, value, writeStream, content,
                                          transportContext, cancellationToken);
    }

    async Task Serialize<T>(IAsyncEnumerator<StreamResult<T>> enumerator,
                            JsonTextWriter writer)
    {
        var serializer = JsonSerializer.Create(SerializerSettings);
        while (await enumerator.MoveNextAsync())
            serializer.Serialize(writer, enumerator.Current.Document);
    }
}

现在我的 WebApi 方法比以前更短了:

public Task<IAsyncEnumerator<StreamResult<Foo>>> Get()
{
    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    return AsyncDocumentSession.Advanced.StreamAsync(query);
}

【讨论】:

    【解决方案4】:

    他们在 C#8 中引入了IAsyncEnumerable&lt;int&gt;

        async IAsyncEnumerable<int> GetBigResultsAsync()
        {
            await foreach (var result in GetResultsAsync())
            {
                if (result > 20) yield return result; 
            }
        }
    

    【讨论】:

      【解决方案5】:

      您可以查看ReactiveExtensions for .Net,它们是专门为您的需求而设计的。最终结果可能如下所示:

      public IObservable<Location> Get()
              {
                  var locations = new Subject<Location>();
      
                  Task.Run(() =>
                           {
                               var query = DocumentSession.Query<Foo, FooIndex>();
                               foreach (var document in DocumentSession.Advanced.Stream(query))
                               {
                                   locations.OnNext(document);
                               }
                               locations.OnCompleted();
                           });
      
                  return locations;
              }
      

      【讨论】:

      • 你只是在那里包装了一个非异步调用。它违背了异步的目的(在 IO 发生时不使用线程)
      • 然后不要换行。我刚刚做了一个使用同步 API 的例子。如果你有异步 API,你肯定不需要任何 Task.Run() 调用。有很多方法可以从任何类型的 API(同步和异步)创建 IObservable&lt;Location&gt; 序列。
      • 我刚刚意识到 AsyncDocumentSession 是 RavenDB 客户端公开的东西,而不是您的自定义东西......抱歉,我以前从未见过这个,所以我无法使用 @987654326 创建工作原型@ 很容易,但我仍然相信 IObservable 非常适合您的问题。
      猜你喜欢
      • 2021-10-16
      • 2018-01-17
      • 1970-01-01
      • 2019-11-20
      • 2014-10-02
      • 2020-06-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多