【问题标题】:access IObservable inside same IObservable subscription在同一个 Observable 订阅中访问 Observable
【发布时间】:2011-10-12 12:36:50
【问题描述】:

这是我尝试使用 Reactive Extensions 做的一个简单示例,但它不起作用

Add 在这个简单的例子中不起作用

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

实际输出

1
_

期望的输出

1
6
2
6
3
6
Sequence Completed
_

即我想在每次迭代中执行一个方法 Add(obs),其中 obs 本身就是正在迭代的冷 IObservable?

【问题讨论】:

  • 你很可能在这里做错了(tm)。你能描述一下你的场景以及你想要在更高层次上完成什么吗?
  • @Paul 请看我刚刚添加到 Enigmativity 的评论 ..

标签: c# iterator system.reactive


【解决方案1】:

改变这个:

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

到这里:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

您应该注意,就 Rx 而言,您正在做一件坏事。您正在进出可观察对象。您应该尽可能避免这种情况。

因此,例如,避免这种情况:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

如果相同:

    var obs = Observable.Range(1, 3);

整个static int Add(IObservable&lt;int&gt; wholeList) 方法也很糟糕。它调用ForEach(通常应该是警告您做错了什么)从可观察对象中取出值。这就是可能发生死锁的地方。

已经有一个名为Sum 的可观察扩展,它返回一个IObservble&lt;int&gt;,这不会让你脱离可观察。

所以试着这样写你的代码:

var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

我希望这会有所帮助。

【讨论】:

  • 这是一个漂亮的答案,我接受你的观点!不幸的是,由于此处未说明的原因,我需要使用 .SubscribeOn(Scheduler.NewThread),是否有另一种方法可以在每次迭代结束时执行 MyMethod(obs)?
  • @Cel - 我将示例代码更改为使用SubscribeOn 而不是ObserveOn,它仍然有效。这是一个示例,说明当超出死锁时留在可观察对象中的工作原理,因此您应该避免调用 MyMethod(obs)
  • 我认为我应该首先发布预期的 MyMethod(obs),因为 Add(obs) 并不能很好地模拟场景。所需的 MyMethod 实际上是 SaveMessages(obs),所以我想做的不是对整数求和,而是在每个单独的消息迭代结束时,我想保存 obs 中包含的所有消息。我不想只在 OnCompleted 保存 obs 中的所有元素,我需要在每次迭代结束时这样做?
  • @Cel - 然后将签名更改为static IObservable&lt;int&gt; Add(IObservable&lt;int&gt; wholeList),然后在方法内使用标准运算符或Observable.Create&lt;T&gt;(...)
【解决方案2】:

与您的评论一样,我建议您让 observable 根据需要生成项目,而不是在订阅后执行此操作。在您的示例中,您可以执行以下操作:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-12-03
    • 2020-08-29
    • 2017-03-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多