【问题标题】:C# add to queue in parallel and listen for queue propagationC# 并行添加到队列并监听队列传播
【发布时间】:2019-03-04 18:55:38
【问题描述】:

我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我的组织结构,我必须单独调用数据库以检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。

目前,我正在使用 Parallel.ForEach() 来查询数据库并将所有元素添加到 ConcurrentList 中。从数据库返回所有数据后,我就会以同步方式执行我的操作。

除了任何明显的问题之外,我不喜欢的一件事是在内存中保留一个大列表并且基本上被阻塞,直到冗长的数据库进程完成。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行或异步的。我只需要它能够侦听何时将某些内容添加到队列或队列不为空。

并行处理:

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
    {
        logger.Info("Fetching Data");
        var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
        Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
        {
            try
            {
                var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);

                concurrentCombinedData.Add(new CombinedAccountInfo()
                {
                    AccountName = r.AccountName,
                    AccountId = r.AccountId,
                    LastLoginDate = r.LastLoginDate,
                    AccountHandle = r.AccountHandle,
                    UserPreferences = userPrefs 
                });
            }
            catch (Exception e)
            {
                logger.Error(e);
            }
        });

        return concurrentCombinedTransaction;
    }

我在 Dataflow 上做了一些阅读,并看到了一些关于 Reactive Extensions 的文章。但是,我似乎可以找到多个生产者向单个消费者提供数据的任何更简单的示例。任何关于如何更好地实现最终目标的建议或想法将不胜感激。

已解决

我将使用Scott Hannen 提供的答案。因为操作很小并且不是很密集,每个进程都可以处理它,而不是试图将所有内容重新绑定到一个列表中。

【问题讨论】:

    标签: c# .net queue task-parallel-library producer-consumer


    【解决方案1】:

    如果您想要在从数据库中检索每个帐户时对其进行操作,那么您可以完全这样做,而不是向ConcurrentBag&lt;CombinedAccountInfo&gt; 添加元素。

    public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
        List<AccountInfo> accountList, 
        string dbConnName,
        Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)
    

    然后,当您从数据库中获取每个元素时,

    doSomethingWithTheAccountInfo(accountInfo);
    

    【讨论】:

    • 如果对数据的操作很短,那么它可能没有任何区别。另一种方法是将项目放入 ConcurrentQueue 并从该队列中读取一个单独的进程,但随后它将轮询队列中的项目。除非您遇到性能问题,否则我不会担心做任何复杂的事情来提高速度。
    • 我对使用 Actions 和 Funcs 不太熟悉。您是否建议我创建一个指向处理数据的方法的操作?我喜欢这个主意,它似乎回答了我的问题。这更具假设性,但可能会成为一个用例,如果我想在并行过程完成后返回一个值怎么办?例如,如果我想从每个帐户返回特定的用户偏好,我可以让并行任务返回一些东西吗?这需要使用 Func 吗?
    • 是的。 Action 和 Func 之间的唯一区别是,Action 表示没有返回值的方法,而 Func 返回一些东西。有时我使用委托,这就像声明一个方法签名并为其命名。我喜欢这样的原因是因为它明确了我传入的方法的用途。但我一直在这方面来回走动。
    【解决方案2】:

    虽然我真的认为您应该一次查询所有用户首选项,因为这将提高您的数据库的性能(真的BIG TIME),如果您想要这样的事情:

    public void Answer<T>(List<Guid> ids)
    {
        var stack = new ConcurrentStack<T>();
    
        Parallel.ForEach(ids, (id) =>
        {
            T value = GetData<T>(id);
    
            stack.Push(value);
        });
    
        Parallel.For(0, ids.Count, (i) =>
        {
            T item;
            while (!stack.TryPop(out item))
            {
                // sleep
            }
            Process(item);
        });
    }
    

    但是我有没有提到,我认为你不应该去那里?

    【讨论】:

    • 如果我有能力一次查询所有首选项,但是所有数据都在 SOA 后面,我必须调用 web 服务来检索它。我只能对每个帐户进行 SOA 调用,没有我可以通过的批次或帐户列表。感谢您的建议,我会在尝试之前看看是否有更好的解决方案。
    猜你喜欢
    • 2013-09-12
    • 2019-07-03
    • 1970-01-01
    • 1970-01-01
    • 2016-03-12
    • 1970-01-01
    • 2018-06-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多