【问题标题】:rxjs5: Delay unsubscription of a shared observablerxjs5:延迟取消订阅共享的 observable
【发布时间】:2016-12-07 06:45:22
【问题描述】:

我有一个创建成本很高的 observable,所以我有 shared 它。但是,在某些情况下,所有订阅者都会取消订阅,然后立即(或在短暂延迟后)有新订阅者订阅。

实际的 observable 过于复杂,无法在此处复制,但为了论证:

const heavyObservable = Rx.Observable.create((observer) => {
    console.log('I am expensive, avoid hitting this code');

    return Rx.Observable
            .interval(500) // these updates are cheap though!
            .subscribe(observer)
                .add(() => {
                    console.log('Cache has been destroyed, will have to be rebuild on next call');
                });
});

我不想使用创建这个 observable 所涉及的昂贵代码。我想将断开连接延迟到 n 毫秒之后。有没有办法做到这一点?

const sharedObservable = heavyObservable
    .publish()
    // ideally I'm looking for a way to get refCount to wait for new 
    // subscribers for n ms before unsubscribing when refcount === 0
    .refCount(); 

// calling subscribe here invokes heavyObservable which can take a bit of time
const subscription1 = sharedObservable.subscribe();
// log: I am expensive, avoid hitting this code

// second call is "free" - the underlying observable is reused
const subscription2 = sharedObservable.subscribe();

subscription1.unsubscribe();
subscription2.unsubscribe();

// calling subscribe again here invokes heavyObservable over again
const subscription3 = sharedObservable.subscribe();
// log: I am expensive, avoid hitting this code

【问题讨论】:

    标签: typescript rxjs5


    【解决方案1】:

    尝试解决这个问题。下面的函数包装了提供的 ConnectableObservable source 并维护订阅者的 refCount。它在第一个订阅者订阅时调用connect(),然后当最后一个订阅者取消订阅时,在delay ms 之后从source 调用setTimeoutunsubscribes

    理想情况下,我宁愿修改现有的 refCount observable,但老实说我不理解代码。

    不确定这是否涵盖了所有可能的边缘情况,或者它是否会产生意想不到的副作用。

    Plunker:https://jsbin.com/wafahusitu/edit?js,console

    function refCountWithUnsubscriptionDelay<T>(source: Rx.ConnectableObservable<T>, delay: number): Rx.Observable<T> {
    
        const refCount = 0;
        const sub;
        let timeoutRef;
    
        return Rx.Observable.create((observer: Rx.Observer<T>) => {
            refCount++;
            if (timeoutRef) {
                clearTimeout(timeoutRef);
            }
            console.log('refCount = ' + refCount);
            if (!sub) {
                // connect on first call
                sub = source.connect();
            }
    
            return source.subscribe(observer)
                    .add(function () {
                        refCount --;
                        if (refCount <= 0) {
                            // trigger delayed unsubscription if there are no listeners
                            timeoutRef = setTimeout(() => {
                                // don't unsubscribe if new listeners have subscribed
                                if (refCount <= 0) {
                                    console.log('unsub');
                                    sub.unsubscribe();
                                    sub = undefined;
                                    timeoutRef = undefined;
                                }
                            }, delay);
                        }
                    });
        })
    }
    

    【讨论】:

    • 老实说:这看起来不像是一个解决方案,应该与其他人共享——它可能“以某种方式”对你有用——但这绝对不是 rxjs 的工作方式——而且我绝对确信可以通过为您的“重”流使用不同的设置来避免这种情况 - 但如果没有看到该流,实际上不可能帮助您 - 为了“保存”可能偶然发现的其他用户这个问题:请删除您的答案,因为它对您的案例来说太具体了,并且使用了非 rxjs 做法。
    • 感谢 olsn。您能否更具体地了解非 rxjs 实践是什么?或者有一些你可以指点我的参考资料吗?你似乎强烈地觉得这是一个糟糕的解决方案。如果您能提供更多关于它为什么如此糟糕的指导,这将极大地帮助我的学习。
    • 改变订阅机制就像买一把扳手,然后把它改成一把刀——如果你不能发布你的信息流,也许你可以通过几个步骤来解释你的信息流在订阅时应该做什么已制作以及该订阅中预期的数据(请随时给我留言:pure.onh at gmail)
    【解决方案2】:

    如果没有完全取消订阅,则不会发出新数据(除非在流的开头有触发器,这在您的问题中并不明显)。 - subscription1subscription2 在您的情况下应该收到相同的值。 如果这是设计使然,那么您可以根本不使用refCount(),而只是发布然后执行sharedObservable.connect(),在这种情况下它总是“热”的。 另一个选项可能是publishReplay(1) 而不是publish()

    无论如何,您的情况听起来有点奇怪,并且很可能可以通过更改数据流的一般架构来解决 - 但是在不了解实际用例的情况下,很难说出 rxjs-operation 会发生什么在这里做最好的。

    【讨论】:

    • 有问题的 observable 轮询数据、转换数据并使其保持最新。每当转换后数据发生变化时,它就会发出。转换最初很昂贵,但维护起来很便宜。我可以使用连接,这将使服务器连接永久打开。不过,如果可能的话,我想断开连接。大多数情况下,这工作正常,但我有一些边缘情况,其中有快速取消订阅和重新订阅。我可以通过添加一个显式的、长期存在的子程序在调用者中处理这个问题,但这是要记住做和清理的额外事情。我宁愿让 observable 处理订阅“平滑”
    • 你能发布那个流吗? - 如果没有全貌,几乎不可能提供适当的帮助(当然您可以更改 url 等..)
    • 我很乐意,但它太大了,无法在这里复制。显然,如果我可以使代码更便宜地运行或避免运行它,那将是最好的,但我认为这超出了堆栈溢出问题的范围。我添加了一个 micky-mouse 示例来尝试更好地说明目标。
    • 我已经制定了一个可能的解决方案,不知道它是否很棒,但我会添加它作为答案......
    猜你喜欢
    • 2021-06-22
    • 1970-01-01
    • 2021-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-19
    • 1970-01-01
    • 2020-08-29
    相关资源
    最近更新 更多