【问题标题】:Reactive Extensions: The trouble with Where()反应式扩展:Where() 的麻烦
【发布时间】:2013-04-11 02:06:31
【问题描述】:

我喜欢 Rx,但我一直遇到一个问题。

假设我们有一个单独的上游IObservable<Foo>N 附加到下游的序列,每个序列都只对满足一些简单谓词(比如foo.bar == someKey)的那些Foos感兴趣。

当然,这对于Where() 操作员来说是一项简单的工作:

IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
...
[many more subscriptions for different bar values]

这里本质上会发生的是,对于上游生成的每个 FooWhere() 谓词将针对 Foo N 次进行评估。它就像一个线性搜索来查找所有想要这个Foo 的订阅者。这一切都很好,这正是我们(应该)在这里使用Where() 所期望的。

我遇到的问题是,就我而言,N 可能非常大,但想要任何特定 Foo 的订阅者子集非常小。通常,每个Foo 将只有一个。这意味着当我可以进行非常有效的查找以找到该Foo 也需要传播的少数下游序列时,我实际上是在进行缓慢的线性搜索。我的应用在性能非常关键的环境中运行,我无法承受这种低效率。

我绞尽脑汁想找到一些更有效的优雅方法,但我只能想出涉及存储大量状态(映射订阅者等)并且必须非常小心地管理并发的解决方案,这首先破坏了使用 Rx 的许多目的。就现有运营商而言,我更喜欢某种方式来处理这个问题。有没有人处理过这个问题,或者知道一个好的解决方案?我很乐意提供更多详细信息。

编辑

我想我的例子有点过于简单了。我不是在处理与某个已知范围内的数值匹配的情况。 N 仅用于说明目的。更新了上面的示例。

【问题讨论】:

  • 你会为 bar 0..N 的所有可能值提供 foos,还是只提供一些?
  • 最好的办法可能是让你的 foos 保持有序。看看List.BinarySearch,然后迭代调用Subscribe直到foo.Bar &gt;= N
  • 对不起,我的例子太简单了,请参阅编辑和新的示例代码。

标签: c# system.reactive


【解决方案1】:

在 Codeplex 的 Rx 讨论板上从 Dave Sexton 那里得到了一个很好的解决方案:

https://rx.codeplex.com/discussions/439717

GroupByGroupByUntilPublish 一起使用如何?

例如:(未测试

IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos
     group foo by foo.bar)
    .Publish();

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A);
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B);
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C);

foosByBar.Connect();

GroupBy 对每个键使用字典查找来找到推送值的适当可观察对象。

Publish 广播 group-by,以便所有观察者共享字典查找操作。

Where / Take 只执行一次谓词来定位适当的组,然后它接收到该组中每个值的广播 与对同一密钥感兴趣的任何其他观察者一起分组。

请注意,GroupBy 不会重播 IGroupedObservable,因此您 必须在连接之前设置您的所有订阅。如果你愿意 而不是使用 RefCount 而不是 Connect,那么也许你应该 考虑将 Replay 运算符应用于 分组依据

【讨论】:

  • 为什么不将他们在那里找到的解决方案发布到这个答案中?
  • 我认为最好从源头获取信息,但无论如何我还是这样做了,以防有一天 CodePlex 爆炸。
【解决方案2】:

有些东西正在存储状态,现在它只是存储您通过 Wheres 添加的所有订阅者的可观察对象。不清楚你是否意识到这一点,但 foos 必须在每条消息上通知它的每个观察者。 Where 所做的只是让大多数观察者简单地检查谓词并返回,但每个谓词都会检查每条消息。

构建一个封装为观察者的处理程序映射不会太困难,并且应该为您带来所需的性能提升。只需根据需要注册尽可能多的处理程序,然后将地图订阅到源 observable。如果Dictionary 没有提供您需要的匹配语义,您将不得不提出一些其他方案来减少查找,但总体思路是一样的。请注意,如果同一处理程序有多个应处理的输入,您可以多次注册它,并且您可以为同一输入注册多个处理程序。

class ObserverMap<T> : IObserver<T>
{
    ObserverMap(Action<Exception> onError, Action onCompleted)
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>();
    }
    ObserverMap(Action<Exception> onError, Action onCompleted, IEqualityComparer<T> comparer) 
    {
        _onError = onError;
        _onCompleted = onCompleted;
        _handlers = new Dictionary<T, List<Action<T>>>(comparer);
    }

    int _stopped;
    Dictionary<T, List<Action<T>>> _handlers;
    Action<Exception> _onError;
    Action _onCompleted;

    public void OnCompleted()
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnError(Exception error)
    {
        if (System.Threading.Interlocked.Exchange(ref _stopped, 1) == 0)
        {
            if (_onCompleted != null) _onCompleted();
        }
    }

    public void OnNext(T value)
    {
        if (_stopped != 0) return;

        List<Action<T>> match;
        if (_handlers.TryGetValue(value, out match))
        {
            foreach (var handler in match)
            {
                handler(value);
            }
        }
    }

    public IDisposable RegisterHandler(T key, Action<T> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");

        List<Action<T>> match;
        if (!_handlers.TryGetValue(key, out match))
        {
            match = new List<Action<T>>();
            _handlers.Add(key, match);
        }
        match.Add(handler);

        return System.Reactive.Disposables.Disposable.Create(() => match.Remove(handler));
    }
}

【讨论】:

  • 我建议,不要编写带有签名IDisposable RegisterHandler(T key, Action&lt;T&gt; handler) 的方法来添加订阅者,而是编写带有签名的索引器属性/方法,更像IObservable&lt;T&gt; this[T key] { get { return Observable.Create(...); } }
  • 是的,我知道正在存储状态以及每个 oberver 的通知(这就是我在描述线性搜索时的意思)。不过,Rx 巧妙地将其隐藏起来,我宁愿不必自己编写这种代码。我确实有一个类似于你描述的解决方案,但我希望有一个更清洁的“内置”方式。
猜你喜欢
  • 1970-01-01
  • 2018-11-25
  • 2015-06-09
  • 2023-03-09
  • 1970-01-01
  • 1970-01-01
  • 2011-03-31
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多