【问题标题】:Efficiently combine many IObservable<bool> streams with boolean operators有效地将许多 IObservable<bool> 流与布尔运算符结合起来
【发布时间】:2026-01-04 20:25:01
【问题描述】:

我希望组合许多 IObservable&lt;bool&gt; 流,以便当所有流的最新值为 true 时,发出 true,否则发出 false。

CombinedLast 可以让我轻松地为两个流构建类似的东西,但是 a) 我不确定 API 是否可以轻松组合数千个流 b) 我不确定它的效率如何即使可以。

All 有点类似于我想要的,除了我假设它适用于单个序列并且一旦 false 不能动态变回 true。

虽然DistintUntilChanged 运算符对此可能没有效率,但我还需要将值“区分到更改”?

我希望有一个 O(1) 算法。

【问题讨论】:

  • “所有其他时间”是指当任何流提供值时结果 Observable 应该发出一个值?
  • 流的数量是恒定的吗?还是动态的?
  • @shlomo 是的,流的数量是恒定的
  • @supertopi 我的意思是,当任何最新输入变为假时,它会发出假,(但在下一个真之前不应再次表现假)

标签: system.reactive reactive-programming


【解决方案1】:

结合最新的一个好方法是从IObservable&lt;IObservable&lt;T&gt;&gt; 开始,然后将其转换为IObservable&lt;T[]&gt;。这成为一种非常动态的方式来组合您需要的任意数量的值。

这是一个扩展方法:

public static IObservable<T[]> CombineLatest<T>(this IObservable<IObservable<T>> sources)
{
    return
        sources.Publish(ss =>
            Observable.Create<T[]>(o =>
            {
                var composite = new CompositeDisposable();
                var list = new List<T>();
                composite.Add(
                    ss.Subscribe(source =>
                    {
                        var index = list.Count;
                        list.Add(default(T));
                        composite.Add(source.Subscribe(x => list[index] = x));
                    }));
                composite.Add(ss.Merge().Select(x => list.ToArray()).Subscribe(o));
                return composite;
            }));
}

这很好地创建和跟踪所有订阅,并使用一个闭包来定义每个订阅需要使用的index 来更新其在用于输出的list 中的值。

如果你这样使用它:

var sources = new Subject<IObservable<bool>>();

var output = sources.CombineLatest();

output.Subscribe(x => Console.WriteLine(x));

var s1 = new Subject<bool>();
sources.OnNext(s1);
s1.OnNext(true);
var s2 = new Subject<bool>();
sources.OnNext(s2);
s2.OnNext(false);
var s3 = new Subject<bool>();
sources.OnNext(s3);
s3.OnNext(true);
s2.OnNext(true);
s1.OnNext(false);

然后你得到这个输出:

如果您将output 的定义更改为var output = sources.CombineLatest().Select(xs =&gt; xs.Aggregate((x, y) =&gt; x &amp; y));,那么您会得到我认为您需要的输出:

真的 错误的 错误的 真的 错误的

【讨论】:

  • 谢谢。看起来它会工作。但我有一种直觉,有一种巧妙的函数式方法,不需要您在每个 OnNext 上不断地重新评估数组中的布尔运算符。 @schlomo 正在使用可变状态跟踪(计数),它应该可以工作,但我觉得必须有一种聪明的方法来解决这个问题,它甚至可能适用于所有不同的布尔运算符。但也许不是¯_(ツ)_/¯
  • @Schneider - “在每个 OnNext 上不断地重新评估数组中的布尔运算符”是什么意思?
  • 对于任何 n 个子流中的每个 OnNext,您都在调用 .Aggregate,这是一个 O(n) 操作。
  • @Shlomo - 其中n 是外部可观察对象产生的可观察对象的数量 - 实际上会很低。
  • @Schneider - @ 通知仅适用于一位用户。如果要发送两个通知,则需要发布两个 cmets。
【解决方案2】:

我不知道如何以经典的功能方式做到这一点,但仍然达到 O(1)。这使用了可变状态,观察每条消息是 O(1),但内存是 O(n):

public IObservable<bool> CombineBooleans(this IObservable<bool>[] source)
{
    return source.Select((o, i) => o.Select(b => (value: b, index: i)))
        .Merge()
        .Scan((array: new bool[source.Length], countFalse: source.Length), (state, item) =>
        {
            var countFalse = state.countFalse;

            if (state.array[item.index] == item.value)
                return (state.array, countFalse);           //nothing to change, emit same state
            else if (state.array[item.index])               //previous/current state is true, becoming false
            {
                countFalse++;
                state.array[item.index] = false;
            }
            else                                            //previous/current state is false, becoming true
            {
                countFalse--;
                state.array[item.index] = true;
            }
            return (state.array, countFalse);
        })
        .Scan((countFalse: source.Length, oldCountFalse: source.Length), (state, item) => (countFalse: item.countFalse, oldCountFalse: state.countFalse))
        .SelectMany(state =>
            state.countFalse == 1 && state.oldCountFalse == 0
                ? Observable.Return(false)
                : state.countFalse == 0 && state.oldCountFalse == 1
                    ? Observable.Return(true)
                    : Observable.Empty<bool>()
        )
        .Publish()
        .RefCount();
}

编辑:添加.Publish().Refcount() 以消除多订阅者错误。

【讨论】:

  • 小心使用.Publish().Refcount(),因为它使可观察对象只产生一次值。这不是一直使用的好模式。您看到了哪些多个订阅者错误?
  • 我的错;在这里并不重要。我在想如果你有双重订阅者,那么你就会得到双重突变。但这对状态没有任何影响。
最近更新 更多