【问题标题】:Parallel Linq - return first result that comes backParallel Linq - 返回第一个返回的结果
【发布时间】:2011-11-11 20:22:27
【问题描述】:

我正在使用 PLINQ 运行一个测试串行端口以确定它们是否是 GPS 设备的函数。

立即发现某些串行端口是有效的 GPS。在这种情况下,我希望第一个完成测试的人是返回的人。我不想等待剩下的结果。

我可以使用 PLINQ 执行此操作,还是必须安排一批任务并等待一个任务返回?

【问题讨论】:

  • 我正要发布这个确切的问题并找到了这个。不幸的是,接受的答案是不正确的。这在 4.0 中是不可能的吗(我可以看到在 4.5 中是可能的)。

标签: c# linq plinq


【解决方案1】:

PLINQ 在这里可能不够用。虽然您可以在 .NET 4 中使用.First,但这将导致它按顺序运行,从而达不到目的。 (注意这个will be improved in .NET 4.5。)

然而,TPL 很可能是这里的正确答案。您可以为每个串口创建一个Task<Location>,然后使用Task.WaitAny等待第一次成功操作。

这提供了一种简单的方法来安排一堆“任务”,然后只使用第一个结果。

【讨论】:

  • 这个问题是我现在需要等待每个任务,查看结果,如果结果是肯定的则取消剩余的任务,或者如果结果为否定则继续等待,同时仍然是如果没有任务,请注意不要等待。对于这样一个看似简单的任务,它的逻辑相当复杂;有没有更好的办法?
  • @DavidPfeffer 您可以将结果推送到BlockingCollection<T>,并有一个线程调用它GetConsumingEnumerable...一旦您获得“有效”结果,就触发取消令牌并使用它。
  • 没有端口有效的情况怎么办?
  • @DavidPfeffer 当你走到最后,如果你还没有找到任何东西,你就会知道没有什么是有效的......
【解决方案2】:

过去几天我一直在思考这个问题,但我在 C# 4.0 中找不到内置的 PLINQ 方法来执行此操作。使用 FirstOrDefault 这个问题的公认答案在完整的 PLINQ 查询完成之前不会返回值,并且仍然返回(有序的)第一个结果。以下极端示例显示了该行为:

var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
    });

cts.CancelAfter(5000);

// waits until all results are in, then returns first
q.FirstOrDefault().Dump("result");

我没有看到立即获得第一个可用结果的内置方法,但我想出了两个解决方法。

第一个创建任务来完成工作并返回任务,从而快速完成 PLINQ 查询。生成的任务可以传递给 WaitAny,以便在它可用时立即获得第一个结果:

var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        return Task.Factory.StartNew(() =>
        {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
        });
    });

cts.CancelAfter(5000);

// returns as soon as the tasks are created
var ts = q.ToArray();

// wait till the first task finishes
var idx = Task.WaitAny( ts );
ts[idx].Result.Dump("res");

这可能是一种糟糕的方式。由于 PLINQ 查询的实际工作只是一个非常快的 Task.Factory.StartNew,所以使用 PLINQ 完全没有意义。 IEnumerable 上的简单 .Select( i =&gt; Task.Factory.StartNew( ... 更简洁,可能更快。

第二种解决方法使用队列 (BlockingCollection),并在计算结果后将结果插入此队列:

var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
    });

cts.CancelAfter(5000);

var qu = new BlockingCollection<string>();

// ForAll blocks until PLINQ query is complete
Task.Factory.StartNew(() => q.ForAll( x => qu.Add(x) ));

// get first result asap
qu.Take().Dump("result");

使用此方法,工作是使用 PLINQ 完成的,并且 BlockingCollecion 的 Take() 将在 PLINQ 查询插入后立即返回第一个结果。

虽然这会产生预期的结果,但我不确定它是否比仅使用更简单的 Tasks + WaitAny 有任何优势

【讨论】:

    【解决方案3】:

    经过进一步审查,您显然可以使用FirstOrDefault 来解决这个问题。默认情况下,PLINQ 不会保留排序,并且对于无缓冲查询,将立即返回。

    http://msdn.microsoft.com/en-us/library/dd460677.aspx

    【讨论】:

    • 我没有看到这种行为。 FirstOrDefault 仍然返回 PLINQ 查询中第一个输入的结果(即使使用 AsUnordered)。
    【解决方案4】:

    完全使用 .NET 4.0 中的 PLINQ 完成此任务:

    SerialPorts.                        // Your IEnumerable of serial ports
        AsParallel().AsUnordered().     // Run as an unordered parallel query
        Where(IsGps).                   // Matching the predicate IsGps (Func<SerialPort, bool>)
        Take(1).                        // Taking the first match
        FirstOrDefault();               // And unwrap it from the IEnumerable (or null if none are found
    

    关键是不要使用像 First 或 FirstOrDefault 这样的有序评估,直到您指定您只关心找到一个。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-01-26
      • 2014-04-17
      • 2016-05-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多