过去几天我一直在思考这个问题,但我在 C# 4.0 中找不到内置的 PLINQ 方法来执行此操作。使用 FirstOrDefault 这个问题的公认答案在完整的 PLINQ 查询完成之前不会返回值,并且仍然返回(有序的)第一个结果。以下极端示例显示了该行为:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
// waits until all results are in, then returns first
q.FirstOrDefault().Dump("result");
我没有看到立即获得第一个可用结果的内置方法,但我想出了两个解决方法。
第一个创建任务来完成工作并返回任务,从而快速完成 PLINQ 查询。生成的任务可以传递给 WaitAny,以便在它可用时立即获得第一个结果:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
return Task.Factory.StartNew(() =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
});
cts.CancelAfter(5000);
// returns as soon as the tasks are created
var ts = q.ToArray();
// wait till the first task finishes
var idx = Task.WaitAny( ts );
ts[idx].Result.Dump("res");
这可能是一种糟糕的方式。由于 PLINQ 查询的实际工作只是一个非常快的 Task.Factory.StartNew,所以使用 PLINQ 完全没有意义。 IEnumerable 上的简单 .Select( i => Task.Factory.StartNew( ... 更简洁,可能更快。
第二种解决方法使用队列 (BlockingCollection),并在计算结果后将结果插入此队列:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
var qu = new BlockingCollection<string>();
// ForAll blocks until PLINQ query is complete
Task.Factory.StartNew(() => q.ForAll( x => qu.Add(x) ));
// get first result asap
qu.Take().Dump("result");
使用此方法,工作是使用 PLINQ 完成的,并且 BlockingCollecion 的 Take() 将在 PLINQ 查询插入后立即返回第一个结果。
虽然这会产生预期的结果,但我不确定它是否比仅使用更简单的 Tasks + WaitAny 有任何优势