【问题标题】:Asynchronous read / write to Realm using RXJava 2使用 RXJava 2 对 Realm 进行异步读/写
【发布时间】:2017-11-02 14:53:04
【问题描述】:

我第一次使用RXJava 2实现异步操作

目标:

使用库Retrofit2 从服务器获取json 数据。如果成功,则将数据写入Realm,并在记录后立即将数据取回并发送到RecyclerView的适配器。

所以,我是这样意识到这一切的:

private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {

    String accessToken = accessDataModel.getAccessToken();

    MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<ChatsModel>() {
                @Override
                public void onNext(ChatsModel chatsModel) {
                    if (chatsRepository.hasData()) {

                        chatsRepository.updateChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        Log.d(TAG, "DO, onSuccess updated!");
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        Log.d(TAG, "DO, onError when update!");
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });

                    } else {
                        chatsRepository.insertChatsData(chatsModel)
                                .subscribe(new DisposableObserver<ChatsModel>() {
                                    @Override
                                    public void onNext(ChatsModel localChatsModel) {
                                        iGetChatsCallback.onGetChatsSuccess(localChatsModel);
                                        Log.d(TAG, "DO, onSuccess inserted!");
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        iGetChatsCallback.onGetChatsError(e.getMessage());
                                        Log.d(TAG, "DO, onError when inserting!");
                                    }

                                    @Override
                                    public void onComplete() {
                                        dispose();
                                        Log.d(TAG, "DO, onComplete!");
                                    }
                                });
                    }
                }

                @Override
                public void onError(Throwable t) {
                    Log.d(TAG, "onError" + t.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}

我在订阅者MyApplication.getRestApi().GetChats()onNext()方法中写入Realm中的数据

这里是入口代码:

public Observable<ChatsModel> updateChatsData(final ChatsModel chatsModel) {

    return Observable.create(new ObservableOnSubscribe<ChatsModel>() {
        @Override
        public void subscribe(ObservableEmitter<ChatsModel> e) throws Exception {
            if (chatsModel != null) {
                realm.executeTransactionAsync(
                        realm -> realm.copyToRealmOrUpdate(chatsModel),
                        () -> {
                            Log.d(LOG_TAG, "Data success updated!");
                            ChatsModel localChatsModel = getAllChatsData();
                            e.onNext(localChatsModel);
                            e.onComplete();
                        },
                        error -> {
                            Log.d(LOG_TAG, "Update data failed!");
                            e.onError(error);
                        });
            }

        }
    });

}

updateChatsData() 异步写入并在另一个类中声明。

如您所见,fetchChatsFromNetwork() 方法写起来很麻烦,或者在我看来是这样

问题:

无论我做对与否,如果不对,那怎么会是对的?

【问题讨论】:

  • 你现在把事情复杂化了,这是肯定的。

标签: java android realm rx-java rx-android


【解决方案1】:
private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) {    
    String accessToken = accessDataModel.getAccessToken();
    Single<ChatsModel> chats = MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version);
    chats.doOnNext((chats) -> {
        chatsRepository.insertOrUpdate(chats);
    }).subscribeOn(Schedulers.io())
    .subscribe();
}

public void updateChatsData(final ChatsModel chatsModel) {
    try(Realm realm = Realm.getDefaultInstance()) {
        realm.executeTransaction(r -> {
            r.insertOrUpdate(chatsModel);
        });
    }
}

public Flowable<List<ChatsModel>> getAllChatsData(Realm realm) {
    RealmQuery<ChatsModel> query = realm.where(ChatsModel.class);
    if(realm.isAutoRefresh()) {
        return query.findAllAsync().asFlowable().filter(RealmResults::isLoaded);
    } else {
        return Flowable.just(query.findAll());
    }
}

【讨论】:

  • 但是他们的订阅者呢。它们应该如何工作?
  • 只有 Flowable 需要订阅者,在这种情况下(使用 RealmResults)你会更新任何你需要更新的东西,严格保存在 UI 线程中
  • 不太明白,能不能再详细点?
  • 你上面写的所有代码都应该换成我写的。
猜你喜欢
  • 2011-10-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-02
  • 2017-10-27
  • 2015-09-19
  • 1970-01-01
相关资源
最近更新 更多