【问题标题】:Removal of nested observable from stream从流中移除嵌套的 observable
【发布时间】:2013-07-11 21:11:24
【问题描述】:

我有一个类ObservableCollection<T>,它代表一个不断变化的集合:

public interface IObservableCollection<T> : IObservable<IEnumerable<T>>
{
    void Add(T item);
    void Remove(T item);
}

添加或删除项目时,Subject&lt;IEnumerable&lt;T&gt;&gt; 内部会调用其 OnNext 方法,该方法使用新的 IEnumerable&lt;T&gt; 调用,该方法通过 IObservableCollection&lt;T&gt;Subscribe 方法公开。

我还有一门课Person:

public interface IPerson
{
    string Name { get; }
    IObservable<int> Position { get; }
}

我想要做的是产生一个IEnumerable&lt;Tuple&lt;string, int&gt;&gt; 流,代表每个人的位置,一个人在集合中的位置。这似乎相对简单:

var peopleCollectionStream = new ObservableCollection<IPerson>();

var peoplePositions = from people in peopleCollectionStream
                      from updateList in
                          (from person in people
                           select person.Position.Select(pos => Tuple.Create(person.Name, pos)))
                           .CombineLatest()
                      select updateList;

我现在可以像这样订阅流:

peoplePositions
    .Subscribe(people =>
    {
        Console.WriteLine("Something was updated");
        foreach (var personTuple in people)
            Console.WriteLine("{0} -> {1}", personTuple.Item1, personTuple.Item2);
    });

我得到了想要的输出:

var alice = new Person() { Name = "Alice" };
peopleCollectionStream.Add(alice);        // Alice -> 0
alice.Move(2);                            // Alice -> 2
var bob = new Person() { Name = "Bob" };
peopleCollectionStream.Add(bob);          // Alice -> 2, Bob -> 0
bob.Move(3);                              // Alice -> 2, Bob -> 3

当我希望从集合中删除一个人并因此从流中排除他们的更新时会出现问题:

peopleCollectionStream.Remove(bob);       // Alice -> 2
bob.Move(4);                              // Alice -> 2, Bob -> 4

如果 Bob 从集合中删除,我想阻止他的位置更新被包括在内。我该怎么做?

【问题讨论】:

    标签: c# system.reactive reactive-programming


    【解决方案1】:

    我发现尝试使用 Add 和 Remove 事件是一个坏主意,如果您想做这些功能性的事情。将删除与添加相匹配,并确保底层代码也能做到这一点,是很多工作。

    我所做的是使用易腐烂的物品/收藏品。我将每个项目与一个生命周期(取消令牌)配对,并且该项目在其生命周期结束时被视为已删除。然后我在连接其他东西时使用这些生命周期。我使用了一个名为PerishableCollection&lt;T&gt; 的集合类型,它采用与生命周期配对的项目,并允许您以IObservable&lt;Perishable&lt;T&gt;&gt; 的形式查看其内容。

    wrote a blog post about perishable collections,并发布了nuget library you can reference

    下面的代码应该扁平化一个 perishable 集合的 perishable 集合:

    public static PerishableCollection<T> Flattened<T>(this PerishableCollection<PerishableCollection<T>> collectionOfCollections, Lifetime lifetimeOfResult) {
        if (collectionOfCollections == null) throw new ArgumentNullException("collectionOfCollections");
    
        var flattenedCollection = new PerishableCollection<T>();
        collectionOfCollections.CurrentAndFutureItems().Subscribe(
            c => c.Value.CurrentAndFutureItems().Subscribe(
    
                // OnItem: include in result, but prevent lifetimes from exceeding source's lifetime
                e => flattenedCollection.Add(
                    item: e.Value,
                    lifetime: e.Lifetime.Min(c.Lifetime)),
    
                // subscription to c ends when the c's lifetime ends or result is no longer needed
                c.Lifetime.Min(lifetimeOfResult)),
    
            // subscription ends when result is no longer needed
            lifetimeOfResult);
    
        return flattenedCollection;
    }
    

    上面的工作是通过订阅接收添加到集合集合中的集合,然后为每个订阅接收项目的人。这些项目被放入生成的集合中,其生命周期在项目死亡或其集合死亡时结束。当赋予该方法的生命周期终止时,所有订阅都将终止。

    解决此问题的另一种方法是编写一个方法来展平IObservable&lt;Perishable&lt;IObservable&lt;Perishable&lt;T&gt;&gt;&gt;&gt;。这样做的好处是不需要调用者如此明确地管理结果的生命周期并适用于更多情况。但是,该方法更难编写,因为您必须以线程安全的方式处理失败/完成的序列。

    这是一个使用 flatten 方法的示例(创建一个新的控制台应用程序,引用 perishable 集合,粘贴上述方法和这个方法):

    using TwistedOak.Collections;
    using TwistedOak.Util;
    
    static void Main() {
        var p = new PerishableCollection<PerishableCollection<string>>();
        var f = p.Flattened(Lifetime.Immortal);
        f.CurrentAndFutureItems().Subscribe(e => {
            Console.WriteLine("{0} added to flattened", e.Value);
            e.Lifetime.WhenDead(() => Console.WriteLine("{0} removed from flattened", e.Value));
        });
    
        // add some 'c' items to f via p
        var c = new PerishableCollection<string>();
        var cLife = new LifetimeSource();
        c.Add("candy", Lifetime.Immortal);
        p.Add(c, cLife.Lifetime);
        c.Add("cane", Lifetime.Immortal);
    
        // add some 'd' items to f via p
        var d = new PerishableCollection<string>();
        p.Add(d, Lifetime.Immortal);
        d.Add("door", Lifetime.Immortal);
        d.Add("dock", Lifetime.Immortal);
    
    
        // should remove c's items from f via removing c from p
        cLife.EndLifetime();
    }
    

    代码应该输出:

    candy added to flattened
    cane added to flattened
    door added to flattened
    dock added to flattened
    candy removed from flattened
    cane removed from flattened
    

    希望这足以让您走上更轻松的道路。

    【讨论】:

    • 这是一个非常有趣的概念,谢谢 - 我必须进一步研究它,看看它是否适用于我的情况。博客也不错
    【解决方案2】:

    答案是.Switch 运算符。通过仅选择要订阅的最新 observable 列表,流会排除最新版本的集合中不存在的任何内容:

    var peoplePositions = (from people in peopleCollectionStream
                           select
                               (from person in people
                                select person.Position
                                    .Select(pos => Tuple.Create(person.Name, pos))
                                ).CombineLatest()
                           ).Switch();
    

    (顺便说一句,如果有人对使用方括号/嵌套 linq 查询语法时的格式有任何好的建议,请告诉我,因为上面看起来很糟糕!)

    【讨论】:

    • 评论您的建议请求。两件事:1)我会使用 from... from 语法(对于 SelectMany)而不是最里面的 .Select。其次,当嵌套变得过于复杂时,请考虑将其重构为单独的方法。如果您使用匿名类型,这会变得很困难,但至少在此示例中您似乎是安全的。
    • 谢谢,中肯的建议。我不经常使用查询语法,但我确实发现嵌套时更容易编写。在我的演示项目中,我实际上使用了匿名类型,但在实际场景中,我肯定会将这些块拆分为方法并使用专用结构。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多