【问题标题】:Task Parallel Library mixed with async/await任务并行库与 async/await 混合
【发布时间】:2017-06-09 12:38:42
【问题描述】:

在 Web 应用程序中,我们为应用程序中的各种数据库表提供分页搜索面板。我们目前允许用户选择单独的行,并通过 UI 在每个选定的实例中执行一些操作。

例如,文档记录面板提供了删除文档的功能。用户可以选中代表 15 个文档标识符的 15 个复选框,然后选择选项 > 删除。这工作得很好。

我希望为用户提供一个选项,以便对与查询匹配的所有行执行一些操作,以在面板中显示数据。

我们可能有 5,000 个符合某些搜索条件的文档,并希望允许用户删除所有 5,000 个。 (我知道这个例子有点做作;让我们忽略允许用户批量删除文档的“智慧”!)

为数千行执行一个方法是一个长时间运行的操作,所以我将把操作排队。认为这相当于 Gmail 将过滤器应用于所有符合某些搜索条件的电子邮件对话的能力。

我需要执行一个返回未知行数的查询,并为每一行插入一行到队列中(在下面的代码中,队列由ImportFileQueue 表示)。

我编码如下:

using (var reader = await source.InvokeDataReaderAsync(operation, parameters))
{
    Parallel.ForEach<IDictionary<string, object>>(reader.Enumerate(), async properties =>
    {
        try
        {
            var instance = new ImportFileQueueObject(User)
            {
                // application tier calculation here; cannot do in SQL
            };
            await instance.SaveAsync();
        }
        catch (System.Exception ex)
        {
            // omitted for brevity
        }
    });
}

在使用事务包装调用的单元测试中运行此程序时,我收到 System.Data.SqlClient.SqlException: Transaction context in use by another session. 错误。

这很容易解决:

  • 将数据库调用从异步更改为同步,或者
  • 删除 Parallel.Foreach,并以串行方式遍历阅读器。

我选择了前者:

using (var reader = await source.InvokeDataReaderAsync(operation, parameters))
{
    Parallel.ForEach<IDictionary<string, object>>(reader.Enumerate(), properties =>
    {
        try
        {
            var instance = new ImportFileQueueObject(User)
            {
                // Omitted for brevity
            };
            instance.Save();
        }
        catch (System.Exception ex)
        {
            // omitted for brevity
        }
    });
}

在典型的用例中,我的思考过程是:

  • 外部阅读器通常会有数千行
  • instance.Save() 调用是“轻量级”;在数据库中插入一行

两个问题:

  1. 有没有合理的方式在Parallel.Foreach内部使用async/await,其中内部代码使用SqlConnection(避免TransactionContext错误)
  2. 如果不是,考虑到我预期的典型用例,我选择利用 TPL 并放弃 async/await 以实现合理的单行保存

What is the reason of “Transaction context in use by another session” 中建议的答案是:

尽可能避免多线程数据操作(无论 加载或保存)。例如。将 SELECT/UPDATE/ 等请求保存在 单个队列并使用单线程工作器为它们提供服务;

但我试图尽量减少总执行时间,并认为Parallel.Foreach 更有可能减少执行时间。

【问题讨论】:

  • 你为什么要使用Parallel?批量修改 N 行通常比尝试单独修改它们快 N 倍。如果您的更新速度很慢,请修复您的数据访问方法。例如,使用批处理来发送 一个 组命令,而不是多个单独的命令。
  • @EricPatrick 您的实际问题是什么?为什么要尝试并行插入记录?
  • @EricPatrick 插入时为什么要并发?插入大量数据的最快方法是使用批量加载,并将数据作为流发送到数据库,以网络和磁盘可以处理的速度最快。您也可以使用 SqlBulkCopy 在客户端执行此操作。 并发意味着锁定和争用,这就是为什么它会导致降低吞吐量。毕竟,您有一张网卡并写入同一个存储。同一网络、磁盘、CPU资源的并发连接内容
  • @EricPatrick 批量加载与发送一批命令相同。数据库使用最少的日志记录,即它不会记录每一个 INSERT 命令。它将修改后的数据页复制到日志中,从而减少 IO 操作。
  • @EricPatrick 并行加载唯一有用的方法是,如果它不会导致争用,或者至少会导致最小的争用。在服务器端,这意味着写入不同的表,或同一张表的不同分区。每个操作只会锁定一个分区。

标签: c# sql-server task-parallel-library


【解决方案1】:

打开一个事务然后在等待 I/O 的同时保持它打开几乎总是一个坏主意。通过首先缓冲数据,您将获得更好的性能(以及更少的死锁)。如果总数据多于您可以轻松缓冲在内存中的数据,则一次将其缓冲成一千左右行的块。如果可能,将它们中的每一个放在单独的事务中。

每当您打开事务时,所占用的所有锁都会保持打开状态,直到它被提交(并且在您插入数据时,无论您是否愿意,锁都会被占用)。这些锁会导致没有WITH(NOLOCK) 的其他更新或读取等待事务提交。在高性能系统中,如果您在持有这些锁的同时进行 I/O,则几乎可以肯定会导致问题,因为其他调用者开始一个操作,然后在此操作在事务之外执行 I/O 时坐下来等待.

【讨论】:

  • 本例中的事务来自一个测试套件,是测试完成后回滚插入数据的糖。我确实可以选择不将整个事物包装在 PROD/真实世界环境中的事务中。尽管如此,我想了解混合 Parallel.Foreach 与 async/await 的影响
  • 问题是你正在共享物理资源,比如线程之间的连接。每个线程打开一个连接是更好的方法,因此它们将是独立的。
猜你喜欢
  • 1970-01-01
  • 2016-12-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-09-04
  • 1970-01-01
  • 2018-06-25
  • 2019-11-11
相关资源
最近更新 更多