【问题标题】:Why do I need to dispose of subscriptions after completion?为什么我需要在完成后处理订阅?
【发布时间】:2017-11-17 20:56:00
【问题描述】:

Intro To RX 一书将 OnSubscribe 上的返回值描述为 IDisposible,并指出在调用 OnErrorOnCompleted 时应处理订阅。

要考虑的一个有趣的事情是,当一个序列完成或 错误,您仍然应该处理您的订阅。

来自Intro to RX: Lifetime Management, OnError and OnCompleted

这是为什么?


作为参考,这是我目前正在学习的课程。我可能会在某个时候将其提交给代码审查。

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
    private readonly Action onTimeout;
    private object signalLock = new object();
    private IObserver<Unit> signals;

    /// <summary>
    /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
    /// </summary>
    /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
    /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
    public TrafficTimeout(TimeSpan timeout, Action onTimeout)
    {
        // Subscribe to a throttled observable to trigger the expirey
        var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
        IDisposable subscription = null;
        subscription = messageQueue.Throttle(timeout).Subscribe(
        p =>
        {
            messageQueue.OnCompleted();
            messageQueue.Dispose();
        });

        this.signals = messageQueue.AsObserver();
        this.onTimeout = onTimeout;
    }

    /// <summary>
    /// Signals that traffic has been received.
    /// </summary>
    public void Signal()
    {
        lock (this.signalLock)
        {
            this.signals.OnNext(Unit.Default);
        }
    }
}

【问题讨论】:

    标签: system.reactive reactive-programming idisposable reactive object-lifetime


    【解决方案1】:

    Subscribe 扩展方法返回的一次性对象仅返回允许您手动取消订阅可观察对象可观察对象自然结束之前。

    如果 observable 完成 - 使用 OnCompletedOnError - 那么订阅已经为您处置了。

    试试这个代码:

    var xs = Observable.Create<int>(o =>
    {
        var d = Observable.Return(1).Subscribe(o);
        return Disposable.Create(() =>
        {
            Console.WriteLine("Disposed!");
            d.Dispose();
        });
    });
    
    var subscription = xs.Subscribe(x => Console.WriteLine(x));
    

    如果您运行上述程序,您会看到“Disposed!”当 observable 完成时写入控制台,无需您在订阅上调用 .Dispose()

    需要注意的重要一点:垃圾收集器永远不会在可观察订阅上调用 .Dispose(),因此如果订阅在您的订阅之前没有(或可能没有)自然结束,您必须处置您的订阅超出范围。

    以这个为例:

    var wc = new WebClient();
    
    var ds = Observable
        .FromEventPattern<
            DownloadStringCompletedEventHandler,
            DownloadStringCompletedEventArgs>(
                h => wc.DownloadStringCompleted += h,
                h => wc.DownloadStringCompleted -= h);
    
    var subscription =
        ds.Subscribe(d =>
            Console.WriteLine(d.EventArgs.Result));
    

    ds observable 只有在它有订阅时才会附加到事件处理程序,并且只会在 observable 完成或订阅被处置时分离。由于它是一个事件处理程序,因此可观察对象永远不会完成,因为它正在等待更多事件,因此处置是与事件分离的唯一方法(对于上面的示例)。

    当你有一个FromEventPattern observable,你知道它只会返回一个值,那么明智的做法是在订阅之前添加.Take(1) 扩展方法,以允许事件处理程序自动分离,然后你就不需要手动处理订阅。

    像这样:

    var ds = Observable
        .FromEventPattern<
            DownloadStringCompletedEventHandler,
            DownloadStringCompletedEventArgs>(
                h => wc.DownloadStringCompleted += h,
                h => wc.DownloadStringCompleted -= h)
        .Take(1);
    

    我希望这会有所帮助。

    【讨论】:

    【解决方案2】:

    首先,这是您可能遇到的问题的一个示例:

    void Main()
    {
        Console.WriteLine(GC.GetTotalMemory(true));
        for (int i = 0; i < 1000; i++)
        {
            DumbSubscription();
            Console.WriteLine(GC.GetTotalMemory(true));
        }
        Console.WriteLine(GC.GetTotalMemory(true));
    }
    
    public void DumbSubscription()
    {
        Observable.Interval(TimeSpan.FromMilliseconds(50))
            .Subscribe(i => {});
    }
    

    您将看到您的内存使用量永远上升。活动的 Rx 订阅不会被垃圾收集,并且这个 observable 是无限的。因此,如果您增加循环限制或添加延迟,您只会浪费更多的内存:除了处理这些订阅之外,没有什么可以帮助您。

    但是,假设我们将DumbSubscription 的定义更改为:

    public void DumbSubscription()
    {
        Observable.Interval(TimeSpan.FromMilliseconds(50))
            .Take(1)
            .Subscribe(i => {});
    }
    

    .Take(1) 的添加意味着 observable 将在一个时间间隔后完成,因此它不再是无限的。您将看到您的内存使用稳定:订阅倾向于在完成或异常时正确处理自己。

    但是,这并没有改变这样一个事实,即与任何其他IDisposable 一样,最好的做法是调用Dispose(手动或通过using)以确保正确处理资源。此外,如果您调整 observable,您很容易遇到开头指出的内存泄漏问题。

    【讨论】:

    • 在我的示例类中,OnCompletedBehaviourSubject 上被调用,所以我猜可以安全地假设序列不是无限的。您是否适合在示例代码中对生命周期管理进行评估?或者,我为此创建了一个代码审查堆栈>>codereview.stackexchange.com/questions/180708/…
    猜你喜欢
    • 2021-12-27
    • 2017-09-06
    • 2017-06-22
    • 1970-01-01
    • 1970-01-01
    • 2015-09-15
    • 1970-01-01
    • 1970-01-01
    • 2023-03-20
    相关资源
    最近更新 更多