【问题标题】:Why RxJava's "subscribe" method called multiple times?为什么 RxJava 的“订阅”方法会被多次调用?
【发布时间】:2018-12-24 19:09:46
【问题描述】:

我正在使用 RxJava 开发一个 Android 应用程序。 例如,我想从本地数据库中获取一些用户数据。 但如果本地数据库没有用户,我应该使用 REST API 获取用户。

class Presenter {
    val mDisposable = CompositeDisposable()

    override fun getUsersFromLocal() {
        Log.d("TAG", "This is called just one")
        mDisposable.add(localDatabase.userDao().getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { // This "subscribe" is the problem. Here is called multiple......
                Log.d("TAG", "This subscribe is called multiple, It called more than 10 times")
                if (it.isEmpty()) {
                    secondCall()
                } else {
                    view.onUsersLoaded(it)
                }
            })
    }

    override fun getUsersFromRemote() {
        mDisposable.add(restApi.getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                saveLocal(it) // If I remove just this line, subscribe is called once. (works fine)
                view.onUsersLoaded(it)
            })
    }
}

我正在使用嵌套的(?) RxJava。 不知道为什么firstCall的subscribe方法被调用了多个... 如果我删除“secondCall()”方法和逻辑,“subscribe”只会调用一次。


我找到了一条线索。 “saveLocal”方法在后台线程中运行。

    private fun saveLocal(users: List<User>) {
        users.forEach {
            appExecutors.diskIo.execute {
                localDatabase.userDao().saveUser(it)
            }
        }
    }

我将上源代码更改为下。

    private fun saveLocal(users: List<User>) {
        appExecutors.diskIo.execute {
            localDatabase.userDao().saveUsers(users)
        }
    }

更改后,“订阅”只被调用了两次。 是的,它仍然有问题。 但在此之前,“订阅”被多次调用。 (我认为它被称为“用户”大小。)


我也添加了“UserDao 类”和“RestApi 类”代码。

import android.arch.persistence.room.*
import io.reactivex.Flowable
import io.reactivex.Single

@Dao
interface UserDao {

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Flowable<List<User>>

}

这里是“RestApiService 类”。

import io.reactivex.Observable

interface RestApiService {
    @Headers(API_HEADER_AUTHORIZATION, API_HEADER_ACCEPT)
    @GET("/users")
    fun getUsers(): Observable<UserList> // UserList class has a list of the user
}

【问题讨论】:

  • 提供的代码并不意味着您会收到两次firstCall 的调用,因此您的原始代码可能存在错误。您调用了两次firstCall,或者您在secondCall.subscribe { } 调用中出现了复制粘贴错误,该调用调用了firstCall
  • 您的代码运行良好。查找所有 firstCall 用法,您可能在代码中某处调用了 firstCall 两次。

标签: android kotlin rx-java rx-java2


【解决方案1】:

对于您的案例,您需要使用“Maybe”而不是“Flowable”。

使用 Flowable,如果您再次添加新数据或更新之前插入的对象,Flowable 对象将自动发出并再次调用 subscribe 方法:

此更改可能会修复您的代码:

@Dao
interface UserDao {

    @Query("SELECT * FROM user ORDER BY id DESC")
    fun getUsers(): Maybe<List<User>>

}

有关 Flowable、Single 和 Maybe 的更多信息,请参阅下面的文章” Room implementation with RxJava

【讨论】:

    【解决方案2】:

    您应该使用 switchIfEmpty 运算符,而不是在订阅中使用 if 语句。基本上你做了两个订阅

    【讨论】:

    • 虽然这是一个很好的建议,但这并不是问题的解决方案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-07
    • 2021-09-06
    • 2018-01-01
    • 2021-09-02
    • 2011-11-21
    • 1970-01-01
    相关资源
    最近更新 更多