【问题标题】:Rxjava - emit on PublishSubject before subscribeRxjava - 在订阅前在 PublishSubject 上发出
【发布时间】:2019-05-03 03:15:53
【问题描述】:

考虑一个场景,我们有一个流发射字符串,我们想将字符串保存在文件中。

我正在使用 PublishSubject,这很好用:

Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")

但是,这不起作用(只有some-string2 被发送)

Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")

有没有办法让第二个场景也能正常工作?

即,我们可以更改PublishSubject 以确保它缓冲事件直到订阅者消费它们?

请注意,BehaviorSubject 不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。

我发现 UnicastSubject 几乎是我想要的,但当我取消订阅并随后使用不同的订阅者重新订阅时,它会因 IllegalStateException 而失败。


用例:

假设我们有一个安卓应用。它发出一个网络请求,在网络请求的后面,它需要显示一个对话框。在发出请求时,用户将应用程序置于后台。此时,我们取消订阅正在监听信号以显示对话框的观察者。

网络请求返回并显示对话框被触发到流中的信号。此时没有人在听。用户前景应用程序。一个新的订阅者被附加到网络请求管理器 (ViewModel)。此时,我希望将“未使用”信号传递给订阅者。

注意:我不能使用行为主题。如果我这样做,每次用户背景和前景应用程序都会显示对话框。我希望在显示对话框后使用并完成事件。

【问题讨论】:

  • 您问题的第一部分让我认为您肯定需要一个 ReplaySubject,但随后您提到您想在程序的不同点取消订阅并再次订阅并获得排放但忽略它们(不要将它们保存在文件中)。这很令人困惑。我觉得问题与主题类型无关。您能否进一步解释您想要实现的目标是什么?
  • 我添加了一个用例。让我知道是否清楚。我还添加了一个潜在的解决方案。如果您有更好的解决方案,请告诉我。
  • 嗯不确定这是否会有所帮助,或者它是否是一个更好的解决方案,但我在我的 Android 应用程序中使用 LiveData 时遇到了类似的用例。我发现的建议方法是创建一个名为 Event 的包装器,如下所示:gist.github.com/dglozano/dd02efa15d070c5517c9e62e212ecd24。然后,您可以使用每次都会发出 Event 的 Behavior 主题,但只有在尚未处理时才会处理它。
  • 不,不幸的是,根据 RxJava 标准,这是反模式。对象应该是不可变的。一旦它们被发出,它们就不应该被改变。

标签: rx-java rx-java2


【解决方案1】:

做了更多的研究,发现了这个:

https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463

来自:Queue like Subject in RxJava

请注意,它使用 Relay,但如果您不想引入其他依赖项,则可以轻松地将 Relay 替换为 Subject。

我对其他解决方案持开放态度,也许会批评为什么这个解决方案不是一个好的解决方案。

/**
 * Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
 * to any other Observer.
 * <p>
 * This relay holds an unbounded internal buffer.
 * <p>
 * This relay allows only a single Observer at a time to be subscribed to it.
 * <p>
 * If more than one Observer attempts to subscribe to this Relay at the same time, they
 * will receive an IllegalStateException.
 *
 * @param <T> the value type received and emitted by this Relay subclass
 */
public final class CacheRelay<T> extends Relay<T> {

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();

    private CacheRelay() {
    }

    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }

    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-25
    • 1970-01-01
    • 2020-10-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多