【问题标题】:Unit test async methods C#单元测试异步方法 C#
【发布时间】:2017-07-14 09:08:19
【问题描述】:

我有问题)

我尝试在我的 c# 代码中重现几个 sp(存储过程)调用,但我想以异步方式进行。

TSQL 示例: (Execute sp @key = 15072000173475; Execute sp @key = 15072000173571; ... Execute sp @key = n;)

[TestClass]
public class UnitTestNomenclature {
    [TestMethod]
    public void ParallelSQLMethod() {
        Task scropeTasks = null;
        //real amount is more then 1500
        long[] keys = new long[] {15072000173475,15072000173571 ... n };

        try {
            var tasks = keys.Select( i =>  Task.Run(async () => { await RunStoredProc(i); }));
            scropeTasks =  Task.WhenAll(tasks);

            scropeTasks.Wait();
        } catch (Exception ex) {
            Debug.WriteLine("Exception: " + ex.Message);

            Debug.WriteLine("IsFaulted: " + scropeTasks.IsFaulted);
            foreach (var inx in scropeTasks.Exception.InnerExceptions) {
                Debug.WriteLine("Details: " + inx.Message);
            }
        }

        Assert.AreEqual(1, 1);
    }

    public async Task RunStoredProc(long scollNumbParam) {
        const string strStoredProcName = @"[dbo].[sp]";
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;")) {
            await conn.OpenAsync();
            Debug.WriteLine("============================================ Connection is open: ==============================================");

            // info
            Debug.WriteLine(String.Format("Connection: {0}", conn.ClientConnectionId));
            Debug.WriteLine(String.Format("State: {0}", conn.State.ToString()));

            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) {

                SqlParameter scrParam = new SqlParameter() {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);

                Debug.WriteLine("Start of Proccesing: " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
                Debug.WriteLine("End of Proccesing: " + scollNumbParam);

            }
        }

        Debug.WriteLine("============================================ Connection is closed: ==============================================");
    }
}

这是我在输出窗口中得到的:

========== Connection is open: ========
Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
========== Connection is open: ==========
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
.....
End of Proccesing: 15072000173475
=========== Connection is closed: =========
End of Proccesing: 15072000173571
=========== Connection is closed: =========

....

A timeout occurred while waiting for memory resources to execute the query in resource pool 'default' (2). Rerun the query.
Actual error number: 8645
Actual line number: 98

还调试说连接池溢出 我认为连接没有适当处理的主要原因,但是我如何通过异步来实现呢?

如果我尝试在声明异步任务之前只打开一个连接并将其传递给我的 RunStoredProc 方法,那么我得到 连接不支持 MultipleActiveResultSets

using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;)) {

                    conn.OpenAsync();
                    var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i, conn); }));
                    scropeTasks = Task.WhenAll(tasks);

                    scropeTasks.Wait();
                }

                Debug.WriteLine("========== Connection is closed: ==========");

这是我在输出窗口中得到的:

Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
======= Connection is open: =============
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
========= Connection is open: =========

【问题讨论】:

  • 是什么阻止了您使测试异步?
  • 没什么,我做了,得到了以上错误。我很想知道为什么会出现这个错误
  • 因为你有 1500 个左右的任务同时执行,并且混合了异步和阻塞调用,这可能导致死锁。你预计会发生什么?
  • 我希望得到异步处理。我正在寻找解决方案。我如何在每次迭代后实现 conenction dispose 事件?可能吗?我试图弄清楚 async/await + ado.net 如何协同工作
  • 按顺序迭代它们。这将花费更长的时间,但至少连接将得到正确处理,以免资源过载。你也可以考虑分批进行。

标签: c# unit-testing asynchronous ado.net async-await


【解决方案1】:

您有 1500 个左右的任务同时执行,并且混合了异步和阻塞调用(例如 .Wait),这可能会导致死锁。

使测试异步并尽量避免async void,除非它在事件处理程序上。

尝试按顺序迭代它们。这将花费更长的时间,但至少连接将得到正确处理,以免资源过载。你也可以考虑分批进行。

[TestMethod]
public async Task ParallelSQLMethod() {
    //real amount is more then 1500
    var keys = new long[] { 
        15072000173475, 
        15072000173571, 
        //....., n
    };
    var tasks = keys.Select(i => RunStoredProc(i));
    var batchSize = 50; //Or smaller

    //run tasks in batches
    var sequence = tasks;
    while (sequence.Any()) {
        var batch = sequence.Take(batchSize);
        sequence = sequence.Skip(batchSize);

        await Task.WhenAll(batch);
    }
}

【讨论】:

    【解决方案2】:

    恐怕我在这里看到了 async/await/concurrent/threading 的经典问题。测试有很多问题,我会一一尝试。

    1) 测试用例架构。您不知道您正在编写的单元测试和 SQL 服务器是位于同一个盒子还是不同的盒子上。

    如果在同一个盒子上,我会选择 Max(n_cores/2, 1) 个连接。

    如果不同的盒子,我会选择 1-3 个连接。

    这些数字可以根据存储过程行为、长/短计算、传输的数据量、连接速度等进行上下调整。

    2) SQL 连接并发问题。您不能打开一个连接,然后以某种方式尝试通过此连接同时调用 1500 个请求。实际上甚至没有两个同时发生。

    这就是它告诉你的:连接不支持 MultipleActiveResultSets。

    您必须同时使用一个打开的连接供一个请求使用。

    但是!您不必仅将它用于一个请求并关闭它,您可以在第一个请求完成后运行下一个请求,这比关闭并创建新连接要快得多。您只需通过每个连接按顺序运行这些请求...

    3) 所以正确的测试用例架构应该是这样的:

    • 一种异步测试方法,
    • 这会将所有键推送到 ConcurrentQueue 队列;
    • 然后 Task[] 任务数组的大小取决于所需的连接数,
    • 启动每个任务并将它们存储到数组中,
    • 等待Task.WhenAll(tasks);

    我非常喜欢玩并发/并行代码,但是在不协调它们的情况下制作越来越多的任务无助于加快速度,反而浪费了资源......

    4) 示例:

    [TestClass]
    public class UnitTestNomenclature
    {
        [TestMethod]
        public async Task ParallelSQLMethod()
        {
            long[] keys = new long[] { 15072000173475, 15072000173571 };
    
            ConcurrentQueue<long> queue = new ConcurrentQueue<long>(keys);
    
            int connections = Math.Max(1, Environment.ProcessorCount / 2);
    
            Task[] tasks =
            Enumerable
            .Range(0, connections)
            .Select(i => Task.Run<Task>(() => RunConnection(i, queue)).Unwrap())
            .ToArray()
            ;
    
            await Task.WhenAll(tasks);
        }
    
        public async Task RunConnection(int connection, ConcurrentQueue<long> queue)
        {
            using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;"))
            {
                await conn.OpenAsync();
                Debug.WriteLine($"====== Connection[{connection}] is open: ======");
    
                Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
    
                long scollNumbParam;
    
                while (queue.TryDequeue(out scollNumbParam))
                {
                    await RunStoredProc(conn, connection, scollNumbParam);
                    Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                    Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
                }
            }
    
            Debug.WriteLine($"====== Connection[{connection}] is closed  ======");
        }
    
        public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam)
        {
            const string strStoredProcName = @"[dbo].[sp]";
    
            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure })
            {
                SqlParameter scrParam = new SqlParameter()
                {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);
    
                Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync();
                Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam);
            }
        }
    }
    

    【讨论】:

      【解决方案3】:

      我用我的代码做了一些实验并获得了适当的结果(异步处理)。我更改了连接链接(添加:Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true),即我增加了连接池连接持续时间的大小。

      • 最大连接池大小(Max Pool Size)

      • 一个连接池中的最小连接数(Min Pool Size)

      • 在连接池中保持连接的秒数 (连接生命周期)(0 是最大值)

      提示:过多的池 Max Pool Size(默认为 100)可能会挂起您的服务器(我是这样做的:))

      我还注意到我没有收到异常 'connection doesn't support MultipleActiveResultSets' With 'MultipleActiveResultSets=true' in my connection string,但处理是同步。您可以在

      上阅读有关 (MARS) 的信息

      结论:服务器上的并行执行不是 MARS 功能,MARS 操作不是线程安全的。 MARS 并非旨在消除应用程序中多个连接的所有要求。如果应用程序需要对服务器真正并行执行命令,应该使用多个连接。它通常用于此类原因

      【讨论】:

      • 是的,通过一个渠道进行并发通信并非易事。我研究过很少的消息队列系统,我通常对这些系统处理并发感到非常失望。做这些事情并不容易,因为工具箱里充满了笨重的解决方案。可能 MARS 功能是在顺序技术和 2005 年的基础上移植的,我认为很有可能。
      猜你喜欢
      • 2020-07-14
      • 2011-11-07
      • 1970-01-01
      • 2021-09-19
      • 1970-01-01
      • 2019-08-02
      • 2017-12-21
      • 1970-01-01
      • 2014-08-21
      相关资源
      最近更新 更多