【问题标题】:Can PLINQ generate two streams? An error stream and a data streamPLINQ 可以生成两个流吗?一个错误流和一个数据流
【发布时间】:2017-04-07 20:51:20
【问题描述】:

我正在使用 PLINQ 处理一个大 (700MB+) CSV 文件。这是查询:

var q = from r in ReadRow(src).AsParallel()
    where BoolParser.Parse(r[vacancyIdx])
    select r[apnIdx];

如果您想知道,它会为空置属性生成 APN 列表。

我的问题是,如何在不对查询/流执行 2 次传递的情况下提取“坏记录”流?

CSV 文件中的每一行都应包含 colCount 记录。我想通过将 where 子句更改为“where r.Count == colCount && BoolParser.Parse(r[vacancyIdx])”来强制执行此操作。

但是,任何格式错误的输入都会默默消失。

我需要在错误日志中捕获任何格式错误的行,并标记 n 行输入未被处理。

目前我在 ReadRow() 函数中完成这项工作,但似乎应该有一种 plinqy 方法将数据流拆分为 2 个或多个要处理的流。

有人知道怎么做吗?如果没有,有谁知道如何将此建议添加到 PLINQ 新功能请求中? ;-)

【问题讨论】:

    标签: c# csv plinq


    【解决方案1】:

    您的要求没有多大意义,因为 PLINQ 是基于“拉”模型(即消费者决定何时消费商品)。考虑如下代码(为简洁起见,使用 C# 7 元组语法):

    var (good, bad) = ReadRow(src).AsParallel().Split(r => r.Count == colCount);
    
    foreach (var item in bad)
    {
        // do something
    }
    
    foreach (var item in good)
    {
        // do something else
    }
    

    Split的实现有两种选择:

    1. 当当前项目属于另一个流时阻止一个流。

      在上面的例子中,一旦第一个好的项目出现,这就会导致死锁。

    2. 在读取另一个流时缓存一个流的值。

      在上面的示例中,假设绝大多数项目都是好的,这将导致在两个 foreach 循环之间的那一刻,大约 700 MB 的数据保留在内存中。所以这也是不可取的。

    所以,我认为您在ReadRow 中的解决方案是可以的。

    另一种选择是:

    where CheckCount(r) && BoolParser.Parse(r[vacancyIdx])
    

    在这里,CheckCount 方法报告它发现的任何错误并为它们返回false。 (如果您这样做,请确保报告线程安全。)


    如果您仍想建议在 PLINQ 中添加类似的内容,或者只是讨论选项,您可以在 the corefx repository 中创建问题。

    【讨论】:

    • 真的感谢您花时间阅读和评论。但是,我确实有一个拉模型。当你说它“没有多大意义”时,我不确定你的意思。你的回答表明你明白我在问什么。 ;-) 您使用两个不同的 foreach 循环提出的第一个解决方案看起来会在数据集上迭代 2 次。这是行不通的,因为每次通过几乎都需要一分钟,而且未来的数据集可能会更大。我会编写代码并测试。如果它只传递一次数据,那是一个很好的解决方案。
    • 您使用 where 子句的第二个解决方案更接近我正在寻找的内容。事实上,我要窃取你的代码,但将 CheckCount() 重命名为 IsValid(r)。谢谢!但是,我仍然在寻找一种使用 linq/plinq 的方法来创建 2 个(或更多)流,只需一次传递数据。我现在就去corefx。谢谢指点。
    • @MarkMoore 在我的第一个解决方案中,我试图解释如果你只迭代一次,你必须有两个致命缺陷之一。这就是为什么它没有意义。
    猜你喜欢
    • 2013-11-25
    • 2020-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-13
    相关资源
    最近更新 更多