【发布时间】:2015-02-26 02:45:58
【问题描述】:
下面的代码 sn-p 是我尝试创建以下功能:
- 创建订阅主题集合的可观察序列
- 当集合中的一个主题产生值时,序列结束,调用返回一组新主题的方法并从 1 重新开始。
- 当对外部 observable 的订阅被处理后,整个事情就停止了
关于我的实施的问题:
- 为什么它可以使用 subjectsSub.SelectMany(x => x).Merge() 而不是使用 subjectsSub.Merge()? (我本来希望后一种方法能奏效)
- 在庞大的 Rx 功能库中是否有更简单、更优雅的解决方案?
更新:这个示例实际上是从 RxJS-Typescript 向后移植的,目的是让更多的受众了解这个问题。原始版本在使用 Javascript 的单线程浏览器环境中运行的事实应该更清楚地说明为什么这种“可观察捕获”可能会起作用(它确实有效,而且没有像搞乱 RxJs 内部那样的肮脏黑客行为)。
class Program
{
private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();
private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
{
// wire some traps for capturing any observables "touched" by expression
expression();
// return observables touched by expression (not in this example of course)
if (observableDependencies.Count > 0)
return observableDependencies.Dequeue();
return new[] {Observable.Never<Unit>()}; // keep going
}
private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
{
return Observable.Create<Unit>(observer =>
{
var isComplete = new Subject<Unit>();
var isAborted = false;
var disp = Scheduler.Default.Schedule(self =>
{
Console.WriteLine("** Next iteration at {0}", sw.Elapsed);
capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
{
observer.OnNext(Unit.Default);
// self-destruct
isComplete.OnNext(Unit.Default);
},
() =>
{
Console.WriteLine("completed");
if (!isAborted)
self();
});
capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
});
return new CompositeDisposable(Disposable.Create(() =>
{
isAborted = true;
// self-destruct
isComplete.OnNext(Unit.Default);
}), disp);
});
}
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
Observable.Return(10).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});
var capturedObservables = new Subject<IObservable<Unit>[]>();
var obs = CreateObservable(capturedObservables, sw);
var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
Console.ReadLine();
disp.Dispose();
Console.ReadLine();
}
}
【问题讨论】:
-
您能解释一下您要解决的实际问题吗?我想更多地了解这个“主题集合”的来源——这部分是在你的问题中假设的,但这对我来说似乎是最不习惯的部分,尤其是下游观察者触发它们再生的想法。我觉得仅仅解决上面的代码会是一种伤害,因为我想不出我会这样做的场景。
-
@JamesWorld 我已经更新了问题。
-
@JamesWorld 我希望用例提供一种神奇的计算方法,自动捕获可观察的依赖项,然后订阅它们,以便在依赖项发生变化时重新运行计算。有关 JavaScript 示例,请参阅
Knockout计算属性。我也看过 c# 示例。一个非常有用的功能,它以轻微的性能成本隐藏了 Rx 的复杂性。 -
好的,我明白你所说的......(我自己使用了敲除/角度等绑定)。我不太清楚的部分是为什么您需要丢弃主题并获得新主题,而不是设置一次并完成它。也许用一个实际的例子就清楚了……
-
@JamesWorld - 在重新评估期间,您“忘记”了以前的依赖项,而只是捕获了新的集合(结果通常与您上次评估的完全相同)。
标签: c# system.reactive