【问题标题】:RxJava2: Subscription not reachedRxJava2:未达到订阅
【发布时间】:2017-06-06 12:44:56
【问题描述】:

我创建了这个订阅,它创建了另一个订阅:

this.loggingInterval$
    .filter(interval -> interval > 0)
    .doAfterNext(
        interval ->
        {
            Observable.interval(interval, TimeUnit.SECONDS)
                .doOnNext(t -> this.readAdditionalConfigurationFile())
                .doOnError(t -> LoggerFactory.getLogger(this.getClass()).error(t.toString()))
                .takeUntil(this.destroy$)
                .subscribe();
        }
    )
    .takeUntil(this.destroy$)
    .subscribe();

因此,每次interval 更改并且大于 0 时,我都会在每个新的 interval 秒内构建另一个订阅读取配置文件。当destroy$ 发出true 时,它会被销毁。

问题是永远无法到达interval.doOnNext()

this.destroy$ = BehaviorSubject.createDefault(false);
this.loggingInterval$ = BehaviorSubject.createDefault(10);

有什么想法吗?

【问题讨论】:

    标签: rx-java


    【解决方案1】:

    在 observable 中创建订阅是一种反模式;有很多更简单的方法。

    this
    .loggingInterval$
    .filter(interval -> interval > 0)
    .distinctUntilChanged()
    .switchMap(interval -> Observable.interval(interval, TimeUnit.SECONDS, Schedulers.io()))
    .doOnNext(dummy -> this.readAdditionalConfigurationFile())
    .doOnError(t -> LoggerFactory.getLogger(this.getClass()).error(t.toString()))
    .takeUntil(this.destroy$)
    .subscribe();
    

    如果订阅被取消订阅,上述操作还具有终止整个链的好处(这是你的 destroy$ 应该做的,对吧?)。

    【讨论】:

    • 是的!它看起来真的很好!尽管如此,我还是要评论另一件事。我已经编辑了帖子。
    • 因此,记录间隔的 BehaviorSubject<> 实际上是更改 f.e. 值的“正确”方式。通过 JMX 公开。但是destroy$ 有点臭,你可以用普通的private boolean destroy;.takeUntil(dummy -> destroy) 替换它——或者直接取消订阅。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-11-22
    • 2023-03-23
    • 2019-05-19
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多