【问题标题】:Subscribe to observable after dispose在 dispose 后订阅 observable
【发布时间】:2017-05-21 16:23:15
【问题描述】:

我正在 android repository by Fernando Cejas 上构建我的应用程序,但在调用 dispose 后订阅 observable 时遇到问题。

当我来到仪表板时,我调用方法subscribeOnUserMessages.execute(new Subscriber(), new Params(token)),这是UseCase类中的方法

public void execute(DisposableObserver<T> observer, Params params) {
    Preconditions.checkNotNull(observer);
    final Observable<T> observable = this.buildUseCaseObservable(params)
            .subscribeOn(Schedulers.from(threadExecutor))
            .observeOn(postExecutionThread.getScheduler());
    addDisposable(observable.subscribeWith(observer));
}

在子类SubscribeOnUserMessages 中,我只需像这样调用存储库 return messageRepository.subscribeOnUserMessages(params);

在我的套接字实现中,我这样创建

return Observable.create(emitter -> {

        if (!isThereInternetConnection()) {
            Timber.w("Network connection exception");
            emitter.onError(new NetworkConnectionException());
            return;
        }

        /*
         * Open socket if not opened
         */
        openSocket(params.getToken());



        String channelName = CHANNEL_PRIVATE_USER + params.getAuthenticated().getUuid();

        if (subscribedChannels.contains(channelName)) {
            Timber.d("Channel %s is already subscribed", channelName);
            return;
        }


        JSONObject auth;

        try {
            auth = createAuthJson(CHANNEL, channelName, params.getToken());
        } catch (JSONException e) {
            Timber.e("Couldn't create auth json");
            emitter.onError(e);
            return;
        }

        mSocket.emit(SUBSCRIBE, auth);
        Timber.d("Emitted subscribe with channel: %s ", CHANNEL_PRIVATE_USER + params.getAuthenticated().getUuid());
        subscribedChannels.add(CHANNEL_PRIVATE_USER + params.getAuthenticated().getUuid());
        Timber.d("Subscribing on event: %s\n with user: %s", EVENT_USER_NEW_MESSAGE, params.getAuthenticated().getUuid());

        if (mSocket.hasListeners(EVENT_USER_NEW_MESSAGE)) {
            Timber.v("Socket already has listener on event: %s", EVENT_USER_NEW_MESSAGE);
            return;
        }


        mSocket.on(EVENT_USER_NEW_MESSAGE, args -> {
            if (args[1] == null) {
                emitter.onError(new EmptyResponseException());
            }

            Timber.d("Event - %s %s", EVENT_USER_NEW_MESSAGE, args[1].toString());

            try {
                MessageEntity messageEntity = messageEntityJsonMapper.transform(args[1]);
                emitter.onNext(messageEntity);
            } catch (JSONException e) {
                Timber.e(e, "Could not parse message json");
                emitter.onError(e);
            }
        });

    });

症状是我第一次订阅时,所有内容都进入了表示层。当我在进入第二个屏幕并返回后处理时,我只看到日志进入套接字实现,但没有通过。

我的问题是:有没有办法再次订阅相同的 observable?我已经尝试在我的用例中以单例形式保存该可观察对象并订阅该可观察对象,但没有帮助。

【问题讨论】:

  • 只需再次调用 .subscribe() 到可观察对象。
  • 再次定义你对同一个 observable 的订阅?它是否是热门的 Observable?如果你再次订阅,你想看到什么结果?
  • SocketImpl 是单例对象,负责从套接字获取消息。
  • @PhoenixWang SocketImpl 是单例对象,负责从套接字获取消息。它在订阅后开始监听套接字,所以它是冷可观察的。整个情况是,我在一个屏幕(仪表板)上订阅了消息,然后我去特定的聊天,所以我处理了那个订阅。当我回来时,我想再次订阅那个 observable。
  • @SimonHarvan 所以你的意思是你想在某些情况下“暂停”那个可观察的?那么我想你可以使用一个主题来管理你自己的上游。

标签: android rx-java


【解决方案1】:

如果没有额外的信息和细节来重新升级套接字实现,很难准确地发现问题,但是,从你发布的代码中,你没有处理逻辑,所以虽然你可以正确地调用 dispose() 到 @ 987654322@ 在正确的生命周期事件中,您的套接字实际上将保持打开状态,并且它可能永远不会正确断开/关闭。
这可能会导致在第二次打开和连接到套接字时出现问题,因为您可能会尝试重新打开已经打开的套接字,并且取决于您的内部套接字实现,这可能是一个问题。
(我可以在评论中看到 openSocket 如果尚未打开,但在其他地方多次调用套接字上的某些方法或设置侦听器时仍然可能存在问题,再次取决于套接字 impl)

作为一般准则,您应该使用emitter.setCancellable()/emitter.setDisposable() 添加处置逻辑,以便在您不再需要套接字资源时正确处置它们,因此 - 再次应用订阅时(无论是否相同)将再次调用您的订阅逻辑,这将重新打开套接字并监听它。

我不清楚您是否希望在移动到其他屏幕时保持套接字打开(我认为这不是一个好习惯,因为您将保持此资源打开并且可能永远不会回到screen 再次使用它),但如果是@Phoenix Wang 提到的情况,您可以使用发布类型运算符多播Observable,因此每个新的Subscriber 都不会尝试重新打开套接字(即调用订阅逻辑) 但只会收到有关在已打开的套接字中运行的消息的通知。

【讨论】:

  • 我刚刚尝试过emitter.setCancellable()emitter.setDisposable(),但它不会属于这些方法。也许我以某种方式没有正确初始化发射器。
  • 也试过observable.doOnDispose(),但没有成功。
  • 你处理 Observable 时它没有运行吗?您是否申请了额外的运营商?
  • 是的,我处理它时没有。第二部分没看懂我应该在哪里申请加法?
  • 第二部分我的意思是您是否将一些运算符添加到流中,无论如何,您可以发布您的代码吗?基本上你需要设置其中一个(因为一个覆盖另一个)确保在创建运行时按预期调用该集(你可以尝试将它作为第一行,并验证它不是作为某些回调的一部分应用或其他东西),这些是官方方法,所以它应该可以工作。
猜你喜欢
  • 2017-11-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多