【问题标题】:How to PLINQ an existing LINQ query with Joins?如何使用联接对现有的 LINQ 查询进行 PLINQ?
【发布时间】:2026-01-10 03:10:01
【问题描述】:

我正在使用 LINQ 将两个数据集相互比较以创建新行并更新现有行。我注意到完整的比较持续了大约 1.5 小时,并且两个内核中只有一个处于忙碌状态(任务管理器的 CPU 使用率为 50-52%)。我必须承认我对并行 LINQ 完全陌生,但我认为它可以显着提高性能。

所以我的问题是,我应该如何以及什么并行化?

这些是原始查询(简化为基本):

'check for new data
Dim srcUnique = From row In src.Email_Total
                Select Ticket_ID = row.ticket_id, Interaction = row.interaction, ModifiedAt = row.modified_time

Dim destUnique = From row In dest.ContactDetail
                 Where row.ContactRow.fiContactType = emailContactType.idContactType
                 Select row.ContactRow.Ticket_ID, row.Interaction, row.ModifiedAt

'get all emails(contactdetails) that are in source but not in destination
Dim diffRows = srcUnique.Except(destUnique).ToList

'get all new emails(according to ticket_id) for calculating contact columns
Dim newRowsTickets = (From row In src.Email_Total
                     Join d In diffRows
                     On row.ticket_id Equals d.Ticket_ID _
                     And row.interaction Equals d.Interaction _
                     And row.modified_time Equals d.ModifiedAt
                     Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList

For Each ticket In newRowsTickets
     Dim contact = dest.Contact.FindByTicket_IDfiContactType(ticket.Ticket_ID, emailContactType.idContactType)
     If contact Is Nothing Then
          ' Create new Contact with many sub-queries on this ticket(omitted) ****'
          Dim newContact = Me.dest.Contact.NewContactRow
          dest.Contact.AddContactRow(newContact)
          contact = newContact
     Else
          ' Update Contact with many sub-queries on this ticket(omitted) '
     End If
     daContact.Update(dest.Contact)

     ' Add new ContactDetail-Rows from this Ticket(this is the counterpart of the src.Email_Total-Rows, details omitted) '
     For Each newRow In ticket.NewTicketRows
         Dim newContactDetail = dest.ContactDetail.NewContactDetailRow
         newContactDetail.ContactRow = contact
         dest.ContactDetail.AddContactDetailRow(newContactDetail)
     Next
     daContactDetails.Update(dest.ContactDetail)
Next

注意daContactdaContactDetailsSqlDataAdapterssourcedestDataSetsContactContactDetailDataTables,其中每个ContactDetail 属于一个联系人。

即使不是两个核心都使用 100% 的 CPU,我认为如果我将查询并行化会显着提高性能,因为第二个核心几乎是空闲的。 for each 也可能是优化的好地方,因为票证彼此不相关。所以我假设我可以循环使用多个线程并并行创建/更新记录。但是如何用 PLINQ 做到这一点?

旁注:正如我在 cmets 中提到的,到目前为止,性能对我来说并不是一个关键因素,因为服务器的唯一目的是同步 MySQL 数据库(在另一个服务器)与 MS SQL 服务器(在与此 Windows 服务相同的服务器上)。它充当由其他服务生成的报告的来源。但这些报告每天只生成一次。但除此之外,我对学习 PLINQ 很感兴趣,因为我认为这可能是一个很好的练习。 仅当目标数据库为空并且必须创建所有记录时才需要提到的 1,5h。如果两个数据库几乎同步,则此方法只需约 1 分钟。由于 email 只是几种联系类型中的一种(聊天+通话将超过 100 万条记录),因此在未来的性能将变得更加重要。我认为无论如何我都需要某种(LINQ)数据分页。

如果有不清楚的地方,我会相应地更新我的答案。提前致谢。


编辑:这是我的调查和尝试的结果:

问题:如何使用连接“PLINQ”现有的 LINQ 查询?

Answer:请注意,一些 LINQ 运算符是二进制的——它们接受两个 IEnumerables 作为输入。 Join 是这种运算符的完美示例。在这些情况下,最左侧数据源的类型决定了使用 LINQ 还是 PLINQ。因此,您只需在第一个数据源上调用 AsParallel 即可让您的查询并行运行:

IEnumerable<T> leftData = ..., rightData = ...;
var q = from x in leftData.AsParallel()
        join y in rightData on x.a == y.b
        select f(x, y);

但如果我按以下方式更改我的查询(注意AsParallel):

Dim newRowsTickets = (From row In src.Email_Total.AsParallel()
                                        Join d In diffRows
                                        On row.ticket_id Equals d.Ticket_ID _
                                        And row.interaction Equals d.Interaction _
                                        And row.modified_time Equals d.ModifiedAt
                                    Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList

编译器会抱怨我还需要将AsParallel 添加到正确的数据源中。所以这似乎是一个 VB.NET 问题或缺乏文档(文章来自 2007 年)。我假设是后者,因为(除了那篇可推荐的)文章还说您需要手动添加 System.Concurrency.dll,但实际上它是 .NET 4.0 Framework 和命名空间 Sytem.Threading.Tasks 的一部分。

我意识到我不会从并行化 Except 中受益,因为在顺序模式下查询速度足够快(即使两个集合中的行数几乎相同,这会导致最大比较次数,我得到了结果不到 30 秒)。但为了完整起见,我会在以后添加它。

所以我决定将for-each 并行化,这与使用 LINQ 查询一样简单,您只需在末尾添加 AsParallel()。 但我意识到我需要强制使用WithExecutionMode(ParallelExecutionMode.ForceParallelism) 进行并行处理,否则.NET 决定只为这个循环使用一个核心。我还想告诉 .NET 我希望使用尽可能多的线程,但不要超过 8 个:WithDegreeOfParallelism(8).

现在两个内核同时工作,但 CPU 使用率保持在 54%。

这是目前为止的 PLINQ 版本:

Dim diffRows = srcUnique.AsParallel.Except(destUnique.AsParallel).ToList

Dim newRowsTickets = (From row In src.Email_Total.AsParallel()
                        Join d In diffRows.AsParallel()
                        On row.ticket_id Equals d.Ticket_ID _
                        And row.interaction Equals d.Interaction _
                        And row.modified_time Equals d.ModifiedAt
                    Group row By Ticket_ID = row.ticket_id Into NewTicketRows = Group).ToList

For Each ticket In newRowsTickets.
                    AsParallel().
                      WithDegreeOfParallelism(8).
                       WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    '  blah,blah ...  '

    'add new ContactDetails for this Ticket(only new rows)
    For Each newRow In ticket.NewTicketRows.
                                AsParallel().
                                    WithExecutionMode(ParallelExecutionMode.Default)
        ' blah,blah ... '
    Next
    daContactDetails.Update(dest.ContactDetail)
Next

不幸的是,与顺序模式相比,使用AsParallel 没有任何性能优势:

for eachAsParallel(hh:mm:ss.mm):

09/29/2011 18:54:36: Contacts/ContactDetails created or modified. Duration: 01:21:34.40

没有:

09/29/2011 16:02:55: Contacts/ContactDetails created or modified. Duration: 01:21:24.50

有人可以解释一下这个结果吗?数据库在for each 中的写访问是否负责类似的时间?


以下是推荐阅读:

【问题讨论】:

  • 我假设您的枚举/集合是 LINQ-to-Objects 而不是 LINQ-to-SQL 实体?您是否尝试过将 .AsParallel() 添加到集合的末尾?
  • @Neil:是的,它是 LINQ-To-Dataset LINQ-To-Objects 的子形式(添加标签)。我刚刚注意到使用 PLINQ 优化它的可能性,所以我现在还没有进行太多测试。我已经意识到我可以简单地将.AsParallel 放在集合的末尾,但是因为我是这个话题的新手,所以我不知道哪种方式是最好的。我可以用.AsParaLell 包围整个查询,或者只用其中的数据源,有什么区别?
  • @TimSchmelter 您是否对 Linq 的每个部分进行了计时,还是 foreach 部分中的对象创建需要时间?
  • @msarchet:我必须承认我还没有时间测量时间,所以我只能猜测。目前在源数据表中大约有 250.000 行(从 MySQL 加载,经过比较、规范化等后将插入到 MS SQL-Server 中)。这取决于两个数据库与此 Windows 服务的同步频率,因此必须创建/更新多少行。对于空的目标 DataTable,for-each 将花费超过 90% 的时间,当两者几乎同步时,比较将花费超过 90%。
  • @Jim:Join 还使用哈希算法来链接表,请参阅 my question on this issue 和 Guffa 和 Thomas 的答案。预过滤数据源的问题在于它是一个没有外键(MyISAM)的非规范化 MySQL 数据库,不受我的控制。源代码还包含很多不一致、重复的内容,并且数据分布在多个表中(Open_Mails、Closed_Mails 等)。这就是我决定使用自己的(干净的)数据库并开发此同步服务的原因之一。

标签: .net vb.net linq linq-to-objects parallel-extensions


【解决方案1】:

有3点值得进一步研究,

  1. 不要使用 .toList()。我可能错了,但我认为使用 .ToList 这种方式不允许编译器优化查询,如果 可以进一步优化。
  2. 使用您自己的过滤操作来比较两者的数据 目的地。它可能会给您带来更好的性能。
  3. 看看你是否可以使用LinqDataview 来提供更好的 性能。

    我认为您在插入时不会获得 PLinq 的优势。查看this answer了解更多详情。

希望对您有所帮助。请询问您是否需要对上述任何一点进行澄清。

【讨论】: