【发布时间】:2019-03-04 18:55:38
【问题描述】:
我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我的组织结构,我必须单独调用数据库以检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。
目前,我正在使用 Parallel.ForEach() 来查询数据库并将所有元素添加到 ConcurrentList 中。从数据库返回所有数据后,我就会以同步方式执行我的操作。
除了任何明显的问题之外,我不喜欢的一件事是在内存中保留一个大列表并且基本上被阻塞,直到冗长的数据库进程完成。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行或异步的。我只需要它能够侦听何时将某些内容添加到队列或队列不为空。
并行处理:
public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
{
logger.Info("Fetching Data");
var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
{
try
{
var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);
concurrentCombinedData.Add(new CombinedAccountInfo()
{
AccountName = r.AccountName,
AccountId = r.AccountId,
LastLoginDate = r.LastLoginDate,
AccountHandle = r.AccountHandle,
UserPreferences = userPrefs
});
}
catch (Exception e)
{
logger.Error(e);
}
});
return concurrentCombinedTransaction;
}
我在 Dataflow 上做了一些阅读,并看到了一些关于 Reactive Extensions 的文章。但是,我似乎可以找到多个生产者向单个消费者提供数据的任何更简单的示例。任何关于如何更好地实现最终目标的建议或想法将不胜感激。
已解决
我将使用Scott Hannen 提供的答案。因为操作很小并且不是很密集,每个进程都可以处理它,而不是试图将所有内容重新绑定到一个列表中。
【问题讨论】:
标签: c# .net queue task-parallel-library producer-consumer