【问题标题】:Running asynchronous queries parallelly (IAsyncEnumerable)并行运行异步查询 (IAsyncEnumerable)
【发布时间】:2021-04-30 11:22:09
【问题描述】:

下面的代码连接到数据库并查询文章信息,然后将它们的计数存储在混乱的存储中。我想并行运行这些查询。现在QueryItems 首先运行,因为我先等待那个foreach。只有这样QueryCount才能运行。

我的问题是:如何并行运行这些异步查询?

internal async void InitializeItems()
{
    await foreach (Item i in this.QueryItems())
    {
        this.items.Add(i);
        this.view.ComboBoxArticle.Items.Add(i.ArticleNumber);
    }

    await foreach (ValueTuple<int, string> t in this.QueryCount())
        this.items.Where(it => it.ArticleNumber.Equals(t.Item2))
            .FirstOrDefault().Count = t.Item1;
}

private async IAsyncEnumerable<Item> QueryItems()
{
    using (OdbcConnection conItems = new OdbcConnection(
        Properties.Settings.Default.PaConnString))
    using (OdbcCommand cmdItems = new OdbcCommand())
    {
        cmdItems.Connection = conItems;
        cmdItems.CommandText = @"SELECT PUB.S_Artikel.Artikel AS ArticleNumber, 
        PUB.S_ArtikelSpr.Bezeichnung AS ArticleName, 
        PUB.S_ArtKtoGr.KontenGruppe AS CapacityGroup, 
        PUB.S_ArtKtoGr.Verbrauchskonto AS CostCenter, 
        PUB.S_KontoSpr.Bezeichnung AS CostName, 
        PUB.S_MengenEinheitSpr.Bezeichnung AS Unit
    FROM PUB.S_Artikel 
        INNER JOIN PUB.S_ArtikelSpr 
            ON PUB.S_Artikel.Artikel = PUB.S_ArtikelSpr.Artikel 
                AND PUB.S_ArtikelSpr.Sprache = 'H' 
        INNER JOIN PUB.SBM_ValueFlowGroup 
            ON PUB.S_Artikel.SBM_ValueFlowGroup_Obj
                = PUB.SBM_ValueFlowGroup.SBM_ValueFlowGroup_Obj 
        INNER JOIN PUB.S_ArtKtoGr 
            ON PUB.SBM_ValueFlowGroup.S_ArtKtoGr_Obj = PUB.S_ArtKtoGr.S_ArtKtoGr_Obj
        INNER JOIN PUB.S_KontoSpr 
            ON PUB.S_ArtKtoGr.Verbrauchskonto = PUB.S_KontoSpr.Konto
                AND PUB.S_KontoSpr.Sprache = 'H'
        INNER JOIN PUB.S_MengenEinheitSpr 
            ON PUB.S_Artikel.LagerME = PUB.S_MengenEinheitSpr.MengenEinheit 
                AND PUB.S_MengenEinheitSpr.Sprache = 'H' " +
    "WHERE PUB.S_Artikel.Firma = '100' " + "AND PUB.S_Artikel.archiviert = 0 " +
    "ORDER BY PUB.S_Artikel.Artikel";

        conItems.Open();
        if (!conItems.State.Equals(ConnectionState.Open))
            throw new Exception(
                "Nem sikerült a cikkinformációk lekérdezése (a PA szerverről).");

        using (DbDataReader readerItems = await Task.Run(
            () => cmdItems.ExecuteReader()))
        {
            while (await Task.Run(() => readerItems.ReadAsync()))
            {
                yield return new Item
                    (
                        readerItems.GetString(0),
                        readerItems.GetString(1),
                        readerItems.GetString(2),
                        readerItems.GetString(3),
                        readerItems.GetString(4),
                        readerItems.GetString(5)
                    );
            }
            await Task.Run(() => MessageBox.Show("ITEMS END"));
        }
    }
}

private async IAsyncEnumerable<ValueTuple<int, string>> QueryCount()
{
    using (OdbcConnection conCount = new OdbcConnection(
        Properties.Settings.Default.PaConnString))
    using (OdbcCommand cmdCount = new OdbcCommand())
    {
        cmdCount.Connection = conCount;
        cmdCount.CommandText = @"SELECT SUM(PUB.MP_ArtPlatz.Bestand),
            PUB.MP_ArtPlatz.Artikel AS ArticleNumber
        FROM PUB.MP_ArtPlatz
        GROUP BY PUB.MP_ArtPlatz.Artikel";

        conCount.Open();
        if (!conCount.State.Equals(ConnectionState.Open))
            throw new Exception("Nem skerült a ProAlpha adatbázisához"
                + " kapcsolódni a darabszám lekérdezése érdekében.");

        using (DbDataReader readerCount = await Task.Run(
            () => cmdCount.ExecuteReaderAsync()))
            while (await Task.Run(() => readerCount.ReadAsync()))
            {
                //{
                //    Item i = this.items.Where(
                //        it => it.ArticleNumber.Equals(artNum)).FirstOrDefault();
                //    i.Count = count;
                //}
                yield return new ValueTuple<int, string>(
                    readerCount.GetInt32(0), readerCount.GetString(1));
            }
    }
}

在 UserControl 的加载方法中,我这样称呼它们:

private void MaterialRequestUC_Load(object sender, EventArgs e)
{
    try
    {
        //...
        this.presenter.InitializeItems();
        //...
    }
    catch (Exception ex) { MessageBox.Show(
        "Hiba: " + ex.Message, "Hiba", MessageBoxButtons.OK, MessageBoxIcon.Error); }
}

【问题讨论】:

  • “caotic storage” 嗨,Ferenc。这是什么意思?
  • SQL Server 是多线程的,所有查询都会根据机器中的内核数自动并行运行。
  • 杂乱无章的存储意味着并非每个项目在存储中都有固定的位置(行、列和楼层)。但是,一种类型的项目可以同时位于多个位置。但是,这些数据可以明确地从数据库中检索。而且我必须处理所有项目(QueryItems())并且需要存储中每个项目的计数(QueryCount)。但是在我的软件中,我放置的所有断点都证明,首先,查询项目并填充组合框。并且只有在第一次 foreach 完成后才会查询项目的总和。
  • this.items 字段的类型是什么?
  • 整个项目很大,复制到这里... QueryItems的结果集的每条记录都是从类型Item(自己的类型)开始的。有几个数据成员,你可以在代码中看到它。

标签: c# sql multithreading async-await parallel-processing


【解决方案1】:

如果您的意思是希望查询计数并行工作,请使用Task.Run 而不是await

internal async void InitializeItems()
{
    await foreach (Item i in this.QueryItems())
    {
        this.items.Add(i);
        this.view.ComboBoxArticle.Items.Add(i.ArticleNumber);
    }

    Task.Run(async () => await foreach (ValueTuple<int, string> t in this.QueryCount())
        this.items.Where(it => it.ArticleNumber.Equals(t.Item2))
            .FirstOrDefault().Count = t.Item1);
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-24
    • 1970-01-01
    • 2011-12-03
    • 1970-01-01
    • 1970-01-01
    • 2021-12-29
    • 2016-12-02
    • 2015-08-10
    相关资源
    最近更新 更多