【问题标题】:Using async / await with DataReader ? ( without middle buffers!)将 async / await 与 DataReader 一起使用? (没有中间缓冲区!)
【发布时间】:2014-07-14 06:45:28
【问题描述】:

我的目标很简单,我想做异步 I/O 调用(使用 async await) - 但是:

好的。

目前这是我的代码,它的工作是从 db 读取并将每一行投影到 Func<>

public IEnumerable < T > GetSomeData < T > (string sql, Func < IDataRecord, T > projector)
{
    using(SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using(SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            _conn.Open();
            _cmd.CommandTimeout = 100000;
            using(IDataReader rdr = _cmd.ExecuteReader())
            {
                while (rdr.Read())  yield    return projector(rdr);
            }
        }
    }
}

那么,什么是投影仪?

每个类都有一个函数,它获取一个record (IDataRecord) 并创建一个实体:

例子:

public class MyClass
{
    public static MyClass MyClassFactory(IDataRecord record)
    {
        return new MyClass
        {
            Name = record["Name"].ToString(),
            Datee = DateTime.Parse(record["Datee"].ToString()),
            val = decimal.Parse(record["val"].ToString())
        };
    }
    public string Name    {   get;   set;  }
    public DateTime Datee    {  get;     set;  }
    public decimal val    {  get;    set;    }
}

所以在这里,MyClassFactory 将是 Func

那么我目前如何运行它?

 var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val]  FROM [WebERP].[dbo].[t]";
 var a = GetSomeData < MyClass > (sql, MyClass.MyClassFactory).Where(...); //notice the Func

一切正常。

问题从现在开始:

在方法中添加async 会产生错误:(是的,我知道 Ienumerable 是一个 Synchronous 接口,因此存在问题)

public async Task&lt;IEnumerable &lt; T &gt;&gt; GetSomeData &lt; T &gt; (string sql, Func &lt; IDataRecord, T &gt; projector)

不能是迭代器块,因为 'System.Threading.Tasks.Task>' 不是迭代器接口类型

But this guy here did-:

可以编译

问题

如何转换我的代码以支持完全异步的 IO 调用?

(条件:不依赖DataFlow,发送投影函数作为参数,无中间缓冲区)

【问题讨论】:

  • 对于这种情况,如果 C# 支持 IAsyncEnumerator,那就太好了。如果您可以以完全异步的方式返回一个急切填写的列表,那么问题就会变得容易得多。
  • ToArray 确实创建了一个中间缓冲区。
  • 您的代码迭代。编译的代码没有。 IEnumerable 不是问题。迭代(即:yield return)是问题所在。缓冲区有什么问题?
  • @RoyiNamir 您必须向调用者公开每个元素的异步性。 IEnumerable 无法做到这一点。获取元素始终是同步的。您需要使用像 IAsyncEnumerator 这样的异步模型。围绕这个想法似乎有合理的库 (asyncenum.codeplex.com)。没有内置任何东西。此外,与所有 ADO.NET 和 SQL 工作相比,缓冲对性能的影响非常小,因此避免缓冲对吞吐量没有任何有意义的影响。对于流式传输大量数据集仍然有意义。

标签: c# asynchronous io async-await .net-4.5


【解决方案1】:

我想做异步 I/O 调用(使用 async await) - 但是:

  • 不使用 DataFlow 依赖项(如本答案)
  • 没有中间缓冲区(不像这个答案)
  • Projector 函数应作为参数发送。 (不喜欢这个答案)

您可能想查看 Stephen Toub 的 "Tasks, Monads, and LINQ",了解有关如何处理异步数据序列的一些好主意。

(还)不可能将yieldawait 结合起来,但我将在这里成为一个口头表达者:引用的要求没有列出IEnumerable 和LINQ。所以,这是一个可能的解决方案,形成两个协程(几乎未经测试)。

数据生产者例程(对应于IEnumarableyield):

public async Task GetSomeDataAsync<T>(
    string sql, Func<IDataRecord, T> projector, ProducerConsumerHub<T> hub)
{
    using (SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using (SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            await _conn.OpenAsync();
            _cmd.CommandTimeout = 100000;
            using (var rdr = await _cmd.ExecuteReaderAsync())
            {
                while (await rdr.ReadAsync())
                    await hub.ProduceAsync(projector(rdr));
            }
        }
    }
}

数据消费者例程(对应于 foreach 或 LINQ 表达式):

public async Task ConsumeSomeDataAsync(string sql)
{
    var hub = new ProducerConsumerHub<IDataRecord>();
    var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);

    while (true)
    {
        var nextItemTask = hub.ConsumeAsync();
        await Task.WhenAny(producerTask, nextItemTask);

        if (nextItemTask.IsCompleted)
        {
            // process the next data item
            Console.WriteLine(await nextItemTask);
        }

        if (producerTask.IsCompleted)
        {
            // process the end of sequence
            await producerTask;
            break;
        }
    }
}

协程执行助手(也可以实现成一对custom awaiters):

public class ProducerConsumerHub<T>
{
    TaskCompletionSource<Empty> _consumer = new TaskCompletionSource<Empty>();
    TaskCompletionSource<T> _producer = new TaskCompletionSource<T>();

    // TODO: make thread-safe
    public async Task ProduceAsync(T data)
    {
        _producer.SetResult(data);
        await _consumer.Task;
        _consumer = new TaskCompletionSource<Empty>();
    }

    public async Task<T> ConsumeAsync()
    {
        var data = await _producer.Task;
        _producer = new TaskCompletionSource<T>();
        _consumer.SetResult(Empty.Value);
        return data;
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

这只是一个想法。对于像这样的简单任务来说,这可能是一种过度杀伤,并且可以在某些方面进行改进(如线程安全、竞争条件和在不触及producerTask 的情况下处理序列的结尾)。然而,它说明了如何将异步数据检索和处理解耦。

【讨论】:

  • Tnx。我不明白 GetSomeDataAsync 如何与 ConsumeSomeDataAsync 一起使用。都取sql命令??
  • @RoyiNamir, ConsumeSomeDataAsync 调用 GetSomeDataAsync 来启动序列并将sql 字符串传递给它。
  • 我可以做类似ConsumeSomeDataAsync().Where(...) 的事情吗? (目前没有)
  • @RoyiNamir,这里有一个简单的小提琴展示了这个概念:dotnetfiddle.net/8gQVt2。不,您将无法使用标准 LINQ。
  • @RoyiNamir,我想你可以这么说。形象地说,这是生产者/消费者模式的一个特例,队列大小为 1 个元素,并且没有阻塞任何线程。 TPL 数据流允许更复杂的场景。然后,还有 Rx。
【解决方案2】:

这个Medium article 描述了另一种解决方案,即使用Dasync/AsyncEnumerable 库。

该库是开源的,可在NuGetGitHub 上使用,并提供了一个可读的语法,现在可以使用IAsyncEnumerable,直到C# 8.0 comes out and provides its own implementation and language support,形式为async ... yield returnawait foreach

(我与图书馆没有任何联系;我认为它可能是一个非常有用的解决方案 - 我认为是! - 与您的问题相同,在我正在开发的项目中。)

【讨论】:

    猜你喜欢
    • 2017-11-05
    • 2019-04-15
    • 2014-06-19
    • 2018-01-14
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多