【问题标题】:RxJava: Split observable into two listsRxJava:将 observable 拆分为两个列表
【发布时间】:2016-12-02 14:52:52
【问题描述】:

用例: 我必须更新存储在本地数据库中的当前订单。步骤是:

  1. 从后端下载数据(~800 项)
  2. 检查本地数据库是否已包含项目。后端人员使用字符串作为主键。
  3. 如果该项目不在数据库中,请添加它。
  4. 如果项目在数据库中,请更新它。

我的第一个解决方案很容易做到。只需调用后端并为每个项目创建两个数据库查询。首先检查它是否已经存在,然后添加或更新它。

正如您想象的那样,它很慢。每次更新大约需要 17.5 秒。

第二种解决方案: 将数据库数据缓存到一个列表中,而不是每次在ArrayList中搜索时都要查询数据库。这导致更新时间下降到 16.5 秒。我使用defer 从数据库调用创建Observable 并使用combineLatest 获取结果。

当前解决方案: 瓶颈当然是数据更新。我使用的 orm 库支持批量更新。所以我必须创建两个列表:DataToUpdateDataToInsert

当前解决方案运行时间约为 5.3 秒。我对时间感到满意,但对这样做的非反应性方式不满意。

Observable<List<OrderDto>> getAllOrdersObservable = backendService.getAllOrders();

ordersDisposable = Observable.combineLatest(getStoredOrder(), getAllOrdersObservable, this::insertOrUpdateOrders)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnComplete(() -> {
            Log.d(TAG, "Duration: " + (System.currentTimeMillis() - start) + " ms");
            showSyncButton();
            notifyListenersDataDownloaded();
        })
        .subscribe();


private Observable<List<Order>> getStoredOrder() {
    return Observable.defer(() -> {
        Log.d(TAG, "Started loading orders from database");
        Observable<List<Order>> just = Observable.just(orderDao.loadAll());
        Log.d(TAG, "Ended loading orders from database");
        return just;
    });
}

private List<Order> insertOrUpdateOrders(List<Order> orders, List<OrderDto> orderDtos) {
    List<Order> ordersToInsert = new LinkedList<>();
    List<Order> ordersToUpdate = new LinkedList<>();

    for (OrderDto orderDto : orderDtos) {
        Order order = getOrderByOrderNumber(orders, orderDto.getNumber());
        if (order != null) {
            dtoToEntityTransformer.updateFields(orderDto, order);
            ordersToUpdate.add(order);
        } else {
            order = dtoToEntityTransformer.transformToEntity(orderDto);
            ordersToInsert.add(order);
        }
    }

    orderDao.insertInTx(ordersToInsert);
    orderDao.updateInTx(ordersToUpdate);

    return orders;
}

问题

您知道如何以被动的方式解决这个问题吗?是否有一个运算符允许将 observable 拆分为两个列表。或者也许我应该使用全局变量(似乎是个坏主意)来保留要插入哪些数据以及要更新哪些数据的信息?

【问题讨论】:

    标签: rx-java rx-android rx-java2


    【解决方案1】:

    使用groupBy,这里的关键是是否更新/插入操作;你会得到一个包含密钥的 GroupedObservable。使用该键执行相应的操作。

    【讨论】: