【问题标题】:Using RxJava to join local data with remote ( or cached ) data使用 RxJava 将本地数据与远程(或缓存)数据连接起来
【发布时间】:2016-02-20 15:04:15
【问题描述】:

这是有效的代码,但我有几个问题以及关于改进它的建议请求。我是 RxJava 的新手,我还没有完全理解如何将这些类型的 observables 链接在一起。

我有两个模型对象,ListItemUserInfoListItems 存在于本地数据库中,UserInfo 是使用ListItem 提供的 ID 从服务器获取的。

UserInfo Web 服务接受一个 ID 数组,并将为其返回一个 UserInfo 对象列表。

这段代码的流程如下:

  1. 从数据库加载ListItems
  2. 使用从数据库中获取的ListItems,检查内存缓存以查看我是否已经获取了特定ListItemUserInfo
  3. 对于没有缓存UserInfo的任何项目,从网络中获取它们
  4. 将获取的UserInfo对象放入缓存中
  5. 重新运行第2步(方法为loadCachedUserInfo
  6. 将结果返回给订阅者

注意:如果列表已被视为 isUserList,则应仅获取 ListItemUserInfo 对象。

代码如下:

fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        }
        return@flatMap Observable.just(listItems)
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false)
        }
        return@flatMap Observable.just(listItems)
    }
}

fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        for ( listItem in listItems ) {
            listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()]
        }
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }
}

fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> {
    val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
    val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
    val records = hashMapOf("records" to ids)
    if ( itemsToFetch.count() == 0 ) {
        return Observable.just(listItems)
    }
    return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
            .map { json ->
                val recordsArray = json.arrayValue("records")
                for ( i in 0..recordsArray.length() - 1) {
                    val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i))
                    coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
                    coreUserMap[coreUserInfo.userID] = coreUserInfo
                    coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
                }
                return@map listItems
            }.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) }
}

用户将通过调用来启动事件序列:

ListController.itemsInList(list)

我对这段代码的问题是:

  1. 目前loadCachedUserInfo 接收ListItem 的数组,并在缓存项与其关联后返回相同的数组作为可观察对象。这对我来说感觉不对。我认为这个调用应该只返回与它关联的缓存UserInfo 的项目。但是,我需要继续将ListItem 的完整数组传递给下一个方法

2.) 我需要做额外的工作来支持退订吗?

3.) 这是类似的问题 1。我的 fetchUserInfoForListItems 接受一个列表项数组,并在它们被获取并通过缓存方法重新运行后返回一个具有相同列表项数组的 observable。这对我来说也是不正确的。我宁愿这个方法为获取的对象返回一个Observable&lt;List&lt;UserInfo&gt;&gt;。我不明白如何在itemsInList 中将ListItems 与新获取的UserInfo 相关联并返回那些ListItems 的Observable。

编辑:写完这篇文章后,它帮助我意识到了一些事情。我可以 flatMap 将我的调用包装在一个 Observable.create 中,它可以包含我想从我的fetchUserInfoForListItems 中提取的智能,让我解决问题 #3。这是更新的代码:

 fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        }
        return@flatMap Observable.just(listItems)
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap Observable.create<List<ATListItem>> { subscriber ->
                fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList ->
                    for (coreUserInfo in userInfoList) {
                        coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
                        coreUserMap[coreUserInfo.userID] = coreUserInfo
                        coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
                    }
                }.flatMap {
                    loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
                }.subscribe {
                    subscriber.onNext(listItems)
                    subscriber.onCompleted()
                }
            }
        }
        return@flatMap Observable.just(listItems)
    }
}

fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] }
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }
}

fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> {
    val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
    val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
    val records = hashMapOf("records" to ids)
    if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) }
    return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
            .map { json ->
                val userInfo = ArrayList<CoreUserInfo>()
                json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) }
                return@map userInfo
            }
}

【问题讨论】:

    标签: android rx-java kotlin rx-kotlin


    【解决方案1】:
    1. 目前 loadCachedUserInfo 接收一个 ListItem 数组,并在缓存项与其关联后返回相同的数组作为 observable。这对我来说感觉不对。我认为这个调用应该只返回具有与之关联的缓存 UserInfo 的项目。但是,我需要继续将 ListItem 的完整数组传递给下一个方法

    我不确定我是否正确理解你,但如果你只需要副作用(缓存),你可以使用doOnNext。例如,

    .doOnNext { listItems ->
        if ( list.isUserList ) {
            cache(listItems, userIDIndex = list.userIDIndex!!)
        }
    }
    
    fun cache(listItems : List<ATListItem>, userIDIndex : Int) {
        // caching
    }
    
    1. 我需要做额外的工作来支持退订吗?

    不,AFAIK。

    注意:

    更多关于doOnNext的信息可以在What is the purpose of doOnNext(...) in RxJavahere找到

    如果 lambda 中的最后一条语句是表达式,通常不需要 return@...。 例如:

    .flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        }
        return@flatMap Observable.just(listItems)
    }    
    

    可以写成:

    .flatMap { listItems ->
        if ( list.isUserList )
            loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        else
            Observable.just(listItems)
    } 
    

    我没有测试代码。

    【讨论】:

    • 谢谢,doOnNext 绝对是缺失的部分。我不再需要让我的缓存存储和查找一个可观察的方法,因为我可以在 doOnNext 中完成它们,这简化了一切。
    猜你喜欢
    • 1970-01-01
    • 2017-05-07
    • 2012-06-26
    • 1970-01-01
    • 2013-01-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-22
    相关资源
    最近更新 更多