【问题标题】:Buffering a LINQ query缓冲 LINQ 查询
【发布时间】:2013-11-18 10:39:49
【问题描述】:

最终编辑

我选择了 Timothy 的答案,但如果您想要一个利用 C# yield 语句的更可爱的实现,请查看 Eamon 的答案:https://stackoverflow.com/a/19825659/145757


默认情况下,LINQ 查询是延迟流式传输

ToArray/ToList 提供完全缓冲,但首先他们渴望,其次可能需要相当长的时间才能完成无限序列。 p>

有什么方法可以结合这两种行为:streamingbuffering 值在生成时即时生成,以便不会触发下一个查询已经查询到的元素的生成。

这是一个基本用例:

static IEnumerable<int> Numbers
{
    get
    {
        int i = -1;

        while (true)
        {
            Console.WriteLine("Generating {0}.", i + 1);
            yield return ++i;
        }
    }
}

static void Main(string[] args)
{
    IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0);

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }

    Console.WriteLine("==========");

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }
}

这是输出:

Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.
==========
Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.

生成代码被触发22次。

我希望它被触发 11 次,第一次迭代可枚举。

那么第二次迭代将受益于已经生成的值。

应该是这样的:

IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer();

对于那些熟悉 Rx 的人来说,这是一种类似于 ReplaySubject 的行为。

【问题讨论】:

  • 真正需要缓存的不是 LINQ,而是 IEnumerablealready on the internet 有几个例子。
  • 昨天在 reddit (here) 上出现了这种情况。我宁愿不窃取该作者的解决方案。
  • @ScottChamberlain:谢谢你的链接,谷歌不是我的朋友。
  • @AustinSalonen:疯狂的巧合,感谢您的链接。 :)
  • 这个的总称是“memoization”。请注意,这里的许多实现都处理了一些简单的情况,但不处理在一个完全完成之前枚举结果的多个枚举器,不处理不同枚举器的并行枚举,如果整个序列没有迭代,等等。要处理这些更复杂的问题,您最好使用现有的库实现。

标签: c# .net linq ienumerable


【解决方案1】:

IEnumerable&lt;T&gt;.Buffer()扩展方法

public static EnumerableExtensions
{
    public static BufferEnumerable<T> Buffer(this IEnumerable<T> source)
    {
        return new BufferEnumerable<T>(source);
    }
}

public class BufferEnumerable<T> : IEnumerable<T>, IDisposable
{
    IEnumerator<T> source;
    List<T> buffer;
    public BufferEnumerable(IEnumerable<T> source)
    {
        this.source = source.GetEnumerator();
        this.buffer = new List<T>();
    }
    public IEnumerator<T> GetEnumerator()
    {
        return new BufferEnumerator<T>(source, buffer);
    }
    public void Dispose()
    {
        source.Dispose()
    }
}

public class BufferEnumerator<T> : IEnumerator<T>
{
    IEnumerator<T> source;
    List<T> buffer;
    int i = -1;
    public BufferEnumerator(IEnumerator<T> source, List<T> buffer)
    {
        this.source = source;
        this.buffer = buffer;
    }
    public T Current
    {
        get { return buffer[i]; }
    }
    public bool MoveNext()
    {
        i++;
        if (i < buffer.Count)
            return true;
        if (!source.MoveNext())
            return false;
        buffer.Add(source.Current);
        return true;
    }
    public void Reset()
    {
        i = -1;
    }
    public void Dispose()
    {
    }
}

用法

using (var evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer())
{
    ...
}

评论

这里的关键是IEnumerable&lt;T&gt; source作为Buffer方法的输入只调用了一次GetEnumerator,不管Buffer的结果被枚举了多少次。 Buffer 结果的所有枚举器共享相同的源枚举器和内部列表。

【讨论】:

  • 它会立即完整地评估数字,甚至在使用 evenNumbers 之前
  • 好吧,正如我在无限序列中所说的那样,蒂莫西ToList 很长。 ;)
  • @sinelaw:正如你所说的“完全”,即使没有完成;)
  • @Pragmateek 我错过了这一点。我知道你想要什么并更新了答案。
  • @TimothyShields:感谢您的实施。我真的希望有一种标准的方法来做到这一点,但没有什么是完美的。你得到这个。 :)
【解决方案2】:

据我所知,没有内置的方法可以做到这一点,这 - 既然你提到了 - 有点令人惊讶(我的猜测是,考虑到人们想要使用此选项的频率,它是可能不值得分析代码以确保生成器每次都给出完全相同的序列)。

但是,您可以自己实现它。最简单的方法是在呼叫站点上,如

var evenNumbers = Numbers.Where(i => i % 2 == 0).
var startOfList = evenNumbers.Take(10).ToList();

// use startOfList instead of evenNumbers in the loop.

更一般和更准确地,您可以在生成器中执行此操作:创建一个List&lt;int&gt; cache,每次生成一个新号码时,将其添加到cache,然后再添加yield return。然后当你再次循环时,首先提供所有缓存的数字。例如

List<int> cachedEvenNumbers = new List<int>();
IEnumerable<int> EvenNumbers
{
  get
  {
    int i = -1;

    foreach(int cached in cachedEvenNumbers)
    {
      i = cached;
      yield return cached;
    }

    // Note: this while loop now starts from the last cached value
    while (true) 
    {
        Console.WriteLine("Generating {0}.", i + 1);
        yield return ++i;
    }
  }
}

我想如果您考虑足够长的时间,您可能会想出一个IEnumerable&lt;T&gt;.Buffered() 扩展方法的一般实现 - 再次,要求是枚举在调用之间不会改变,问题是它是否值得它。

【讨论】:

  • 我的回答提供了您所说的通用“Buffered”方法。
  • 感谢您的回答CompuChip,是的,这是我正在寻求的通用解决方案。无论如何+1。 :)
  • @TimothyShields 我看到你在我发布我的答案后编辑了你的答案。不错,谢谢!
【解决方案3】:

为此,您可以使用 F# 电源组中的 Microsoft.FSharp.Collections.LazyList&lt;&gt; 类型(是的,来自未安装 F# 的 C# - 没问题!)。它在 Nuget 包中FSPowerPack.Core.Community

特别是,您想调用LazyListModule.ofSeq(...),它返回一个LazyList&lt;T&gt;,它实现了IEnumerable&lt;T&gt;,并且是惰性和缓存的。

在您的情况下,使用只是......

var evenNumbers = LazyListModule.ofSeq(Numbers.Where(i => i % 2 == 0));
var cachedEvenNumbers = LazyListModule.ofSeq(evenNumbers);

虽然我个人更喜欢 var 在所有这些情况下,但请注意,这确实意味着编译时类型将比 IEnumerable&lt;&gt; 更具体 - 并不是说​​这可能是一个缺点。 F# 非接口类型的另一个优点是它们公开了一些使用普通 IEnumerable 无法有效执行的高效操作,例如 LazyListModule.skip

我不确定LazyList 是否是线程安全的,但我怀疑它是。


在下面的 cmets 中指出的另一个替代方法(如果您安装了 F#)是 SeqModule.Cache(命名空间 Microsoft.FSharp.Collections,它将在 GACed 程序集 FSharp.Core.dll 中),它具有相同的有效行为。与其他 .NET 枚举一样,Seq.cache 没有可以有效链接的尾部(或跳过)运算符。

线程安全: 与此问题的其他解决方案不同,Seq.cache 是线程安全的,因为您可以让多个枚举器并行运行(每个枚举器都是不是线程安全的)。

性能我做了一个快速的基准测试,LazyList 枚举的开销至少是 SeqModule.Cache 变体的 4 倍,后者至少有 3 倍比自定义实现答案更多的开销。因此,虽然 F# 变体可以工作,但它们并没有那么快。请注意,与执行(例如)I/O 或任何非平凡计算的可枚举相比,慢 3-12 倍仍然不是很慢,所以这在大多数情况下可能无关紧要,但最好保持介意。

TL;DR如果您需要一个高效、线程安全的缓存枚举,只需使用SeqModule.Cache

【讨论】:

  • 谢谢Eamon,F# 充满惊喜。 :) +1
  • @Pragmateek 是的 - 这只是 F# 中的 Seq.cache
【解决方案4】:

这是一个不完整但紧凑的“功能性”实现(未定义新类型)。

错误是它不允许同时枚举。


原文说明: 第一个函数应该是第二个函数中的匿名 lambda,但是 C# does not allow yield in anonymous lambdas:

// put these in some extensions class

private static IEnumerable<T> EnumerateAndCache<T>(IEnumerator<T> enumerator, List<T> cache)
{
    while (enumerator.MoveNext())
    {
        var current = enumerator.Current;
        cache.Add(current);
        yield return current;
    }
}
public static IEnumerable<T> ToCachedEnumerable<T>(this IEnumerable<T> enumerable)
{
    var enumerator = enumerable.GetEnumerator();
    var cache = new List<T>();
    return cache.Concat(EnumerateAndCache(enumerator, cache));
}

用法:

var enumerable = Numbers.ToCachedEnumerable();

【讨论】:

  • 这是错误的:它不支持多个同时迭代。例如。 cached.ZipWith(cached.Skip(1), Tuple.Create) 会崩溃 - 请注意,这是一个特别有趣的支持案例,因为缓存可以同时确保列表只被评估一次,但它也是惰性的。
  • 另外,没有必要使用双嵌套函​​数 - 无论如何,您正在评估 em。
  • 糟糕,那个双重匿名 lambda 彻底溜走了。固定。
  • 你对这个错误也是正确的。我将把这个答案作为“如何不去做”
【解决方案5】:

我希望这个答案结合了sinelaw's answer 的简洁明了和Timothy's answer 对多个枚举的支持:

public static IEnumerable<T> Cached<T>(this IEnumerable<T> enumerable) {
    return CachedImpl(enumerable.GetEnumerator(), new List<T>());
}

static IEnumerable<T> CachedImpl<T>(IEnumerator<T> source, List<T> buffer) {
    int pos=0;
    while(true) {
        if(pos == buffer.Count) 
            if (source.MoveNext()) 
                buffer.Add(source.Current); 
            else 
                yield break;
        yield return buffer[pos++];
    }
}

关键思想是使用yield return 语法来实现一个简短的可枚举实现,但是您仍然需要一个状态机来决定您是否可以从缓冲区中获取下一个元素,或者您是否需要检查底层枚举器。

限制:这并没有试图成为线程安全的,也没有处理底层枚举器(通常,作为底层的枚举器是相当棘手的)只要仍然可以使用任何缓存的枚举器,未缓存的枚举器就必须保持未释放)。

【讨论】:

  • 不错。它还通过了 Zip 测试。
  • 是的。遗憾的是,正如您所指出的,它需要一个毫无意义的包装方法,但仍然比所有手动接口实现的东西更好。
  • 我添加了更长的another solution,但使用了模拟匿名迭代器的通用模式,所以更漂亮一些。
  • @EamonNerbonne:可爱 :) 谢谢。 +1
  • 当你有一个悬空的else 时,在你的if 周围使用大括号通常是个好主意,就像你在这里一样。
【解决方案6】:

Eamon's answer above 的基础上,这是另一个功能性解决方案(没有新类型),它也适用于同时评估。这表明了一个通用模式(共享状态的迭代)是这个问题的基础。

首先我们定义了一个非常通用的辅助方法,目的是让我们可以模拟anonymous iterators in C#缺失的特性:

public static IEnumerable<T> Generate<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

Generate 就像一个带有状态的聚合器。它接受一个返回初始状态的函数,以及一个匿名的生成器函数,其中包含yield return,如果在 C# 中允许的话。 initialize 返回的状态是每个枚举的,而更全局的状态(在所有枚举之间共享)可以由生成的调用者维护,例如在闭包变量中,如下所示。

现在我们可以用它来解决“缓冲的可枚举”问题:

public static IEnumerable<T> Cached<T>(IEnumerable<T> enumerable)
{
    var cache = new List<T>();
    var enumerator = enumerable.GetEnumerator();

    return Generate<T>(() =>
    {
        int pos = -1;
        return () => {
            pos += 1;
            if (pos < cache.Count())
            {
                return new Tuple<T>(cache[pos]);
            }
            if (enumerator.MoveNext())
            {
                cache.Add(enumerator.Current);
                return new Tuple<T>(enumerator.Current);
            }
            return null;
        };
    });
}

【讨论】:

  • 感谢这个sinelaw。 :) +1
  • 使用Tuple&lt;T&gt; 作为可选的T 实际上是我以前从未想过的。肯定是一个绝妙的技巧。 +1
  • @TimothyShields 嗯,我不认为这是一个很好的技巧 - 它有点误导。如果您想要和可选值,为什么要使用(微不足道的)类 OptionalValueOptionalReference - 精心挑选的名称有助于代码可维护性。
  • @sinelaw:我喜欢这个想法,但是您在传递参数时不必要地有创意:您可以使用闭包避免“通过数组引用 int”技巧(即生成参数可以是 @ 987654331@然后);并且您可能想要命名生成器状态的概念(即生成参数可以是Func&lt;Func&lt;ValueOrEnd&gt;&gt;
  • 很好的答案,谢谢。我开始使用这段代码作为起点,并为它编写了一些测试。我的测试暴露了这样一个事实,即每次重复使用缓冲结果(到达“结束”时)在原始枚举器上调用一次“MoveNext”。这几乎永远不会成为问题,因为您会想象 IEnumerator 的大多数实现都会有一些状态并且知道它们已经完成,但我不确定这是否得到保证。如果打算完全重播第一次发生的事情,那么可以说闭包中应该有另一个状态变量,例如bool completed
【解决方案7】:

感谢Eamon Nerbonnesinelaw 他们的回答,只是一些调整!首先,在完成时释放枚举器。其次,使用锁保护底层枚举器,以便可以在多个线程上安全地使用枚举器。

// This is just the same as @sinelaw's Generator but I didn't like the name
public static IEnumerable<T> AnonymousIterator<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

// Cached/Buffered/Replay behaviour
public static IEnumerable<T> Buffer<T>(this IEnumerable<T> self)
{
    // Rows are stored here when they've been fetched once
    var cache = new List<T>();

    // This counter is thread-safe in that it is incremented after the item has been added to the list,
    // hence it will never give a false positive. It may give a false negative, but that falls through
    // to the code which takes the lock so it's ok.
    var count = 0;

    // The enumerator is retained until it completes, then it is discarded.
    var enumerator = self.GetEnumerator();

    // This lock protects the enumerator only. The enumerable could be used on multiple threads
    // and the enumerator would then be shared among them, but enumerators are inherently not
    // thread-safe so a) we must protect that with a lock and b) we don't need to try and be
    // thread-safe in our own enumerator
    var lockObject = new object();

    return AnonymousIterator<T>(() =>
    {
        int pos = -1;
        return () =>
        {
            pos += 1;
            if (pos < count)
            {
                return new Tuple<T>(cache[pos]);
            }
            // Only take the lock when we need to
            lock (lockObject)
            {
                // The counter could have been updated between the check above and this one,
                // so now we have the lock we must check again
                if (pos < count)
                {
                    return new Tuple<T>(cache[pos]);
                }

                // Enumerator is set to null when it has completed
                if (enumerator != null)
                {
                    if (enumerator.MoveNext())
                    {
                        cache.Add(enumerator.Current);
                        count += 1;
                        return new Tuple<T>(enumerator.Current);
                    }
                    else
                    {
                        enumerator = null;
                    }
                }
            }
        }
        return null;
    };
});

}

【讨论】:

  • 有一个竞争条件使该代码无法成为线程安全的。两个线程尝试获取列表中的最后一项。线程 A 检查pos &lt; count 以查看是否有缓存结果;没有。线程 B 检查pos &lt; count 以查看是否有缓存结果;没有。线程 B 移动到最后一项并返回它。线程 B 尝试获取下一项,遇到列表末尾,并设置enumerator=null。线程 A 检查enumerator != null,发现它是nullreturn null,而不是返回最后一项。
  • 你说的对,谢谢!我已经编辑了代码以删除对枚举器的外部检查,我认为这可以解决问题。你同意吗?
【解决方案8】:

我使用以下扩展方法。

这样,输入以最大速度读取,消费者以最大速度处理。

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> input)
{
    var blockingCollection = new BlockingCollection<T>();

    //read from the input
    Task.Factory.StartNew(() =>
    {
        foreach (var item in input)
        {
            blockingCollection.Add(item);
        }

        blockingCollection.CompleteAdding();
    });

    foreach (var item in blockingCollection.GetConsumingEnumerable())
    {
        yield return item;
    }
}

使用示例

这个例子有一个快速的生产者(查找文件)和一个缓慢的消费者(上传文件)。

long uploaded = 0;
long total = 0;

Directory
    .EnumerateFiles(inputFolder, "*.jpg", SearchOption.AllDirectories)
    .Select(filename =>
    {
        total++;
        return filename;
    })
    .Buffer()
    .ForEach(filename =>
    {
        //pretend to do something slow, like upload the file.
        Thread.Sleep(1000);
        uploaded++;

        Console.WriteLine($"Uploaded {uploaded:N0}/{total:N0}");
    });

【讨论】:

  • 你有没有测量过这个,看看你的断言是否正确?我对ConcurrentQueue 的体验是,锁定会使这个速度变慢。
  • 这也会提升 CPU。如果input 很慢,yield return 循环只会在 CPU 上旋转。
  • 谢谢@Enigmativity,我把它从ConcurrentQueue 改成了BlockingCollection
  • 对不起,任何形式的并发或阻塞收集都是一样的。
猜你喜欢
  • 2016-12-23
  • 2010-10-24
  • 1970-01-01
  • 2015-04-11
  • 1970-01-01
  • 2011-06-16
  • 2012-07-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多