【问题标题】:Rxjava 3 + Retrofit2 - multiple inserts to DB problemRxjava 3 + Retrofit2 - 多次插入数据库问题
【发布时间】:2020-05-09 23:00:51
【问题描述】:

我正在尝试执行以下操作;使用 Retrofit 将云数据库同步到设备上的本地 SqLite 数据库(房间)。 DB 可能会变大,大约 100,000 个寄存器或更多,因此同步过程可能需要一些时间。所以它发送第一个Retrofit请求来获取寄存器的数量,这样它就可以计算总页数,然后它会发送多个Retrofit Request,从API中获取所有数据,每次请求后,它将数据保存到房间。

现在,我在组合两个 RxJava 调用或进程时遇到了麻烦,同样在第二个 RxJava 进程上,在 Retrofit 调用之后,有一个对象列表的房间插入,但是在孔进程结束后,我请注意,并非 100% 的所有记录都被插入,每次我运行该过程时,插入的记录数都会发生变化,大约是 80% - 98%,但从来没有 100%,即使所有的 Retrofit 调用都已发送。

请帮助我:

  1. 如何在一次 RxJava 调用中完成所有进程,而不是像我拥有的​​那样 2 现在?
  2. 如何将 100% 的记录插入到 Room?

按照代码:

分级

def room_version = "2.2.5"
//RxJava 2
implementation "io.reactivex.rxjava2:rxjava:2.2.19"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 2
implementation "com.squareup.retrofit2:adapter-rxjava2:2.8.1"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//Room
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
//RxJava support for Room
implementation "androidx.room:room-rxjava2:$room_version" 

ItemSyncDetails

...
public class ItemSyncDetails {
    @SerializedName("CurrentPage")
    int currentPage;
    @SerializedName("PageCount")
    int pageCount;
    @SerializedName("PageSize")
    int pageSize;
    @SerializedName("RecordCount")
    int recordCount;
    @SerializedName("Results")
    List<Item> mItemList;
...
}

物品道

注意:我没有使用 Observer/Flowable/Maybe/Single,因为我有 能够使其与 RxJava 一起工作

import io.reactivex.Flowable;

@Dao
public interface ItemDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    long insert(Item item);

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    List<Long> insertAll(List<Item> items);
...

数据接口

import io.reactivex.rxjava3.core.Observable;
...

public interface DataApi {

    @GET("item")
    Observable<ItemSyncDetails> getItemsByPage(
            @Query("pageSize") Integer pageSize,
            @Query("currentPage") Integer currentPage,
            @Query("sortBy") Integer sortBy
    );

项目库

import io.reactivex.Observable;
    ...

    public class ItemRepository {
    ...

        public ItemRepository(Application application) {
            mDataApi = RetrofitClient.getRetrofitInstance("http://192.168.1.100").create(DataApi.class);
            RfidDatabase db = RfidDatabase.getAppDatabase(application);
            itemDao = db.itemDao();
            itemList = itemDao.getAllItems();
            inserts = 0;
        }

        public List<Long> insertAllLocal (List<Item> itemList) {
            List<Long> items = itemDao.insertAll(itemList);
            inserts += items.size();
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + inserts + "*************");
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + items);
            return items;
        }

        public Observable<ItemSyncDetails> getRecordsCount(){
            return mDataApi.getItemsByPage(1,1,1);
        }

        public Observable<ItemSyncDetails> getItemsPerPage(int pageSize,int currentPage){
            return mDataApi.getItemsByPage(pageSize,currentPage,1);
        }
    ...

SyncConfigFragment 

    import io.reactivex.Observable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.disposables.CompositeDisposable;
    import io.reactivex.functions.Function;
    import io.reactivex.schedulers.Schedule
    ...

    public class SyncConfigFragment extends Fragment {


        private ItemViewModel itemViewModel;
        private ImageView imageSyncItems;
        private ProgressDialog progressDialog;
        private TextView tvSyncDescriptionItems;
        private DataApi service;
        private ItemSyncDetails mItemSyncDetails;
        private List<Item> mItemlist;
        private CompositeDisposable mCompositeDisposable;
        private int mNumPages;
        private int syncProgress;
        ...

        @Override
        public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
            View view =  inflater.inflate(R.layout.fragment_config_sync,container, false);
            progressDialog = new ProgressDialog(getActivity());
            sharedPref = getActivity().getSharedPreferences(
                    getString(R.string.sharepref_filename), Context.MODE_PRIVATE);
            mItemlist = new ArrayList<Item>();
            mCompositeDisposable = new CompositeDisposable();
            itemViewModel = ViewModelProviders.of(this).get(ItemViewModel.class);
            tvSyncDescriptionItems = view.findViewById(R.id.tvDescriptionSyncItems);
            if(sharedPref.contains("last_sync_item")) {
                tvSyncDescriptionItems.setText("Última actualización " + sharedPref.getString("last_sync_item",""));
            } else{
                tvSyncDescriptionItems.setText("No se ha Sincronizado");
            }
            imageSyncItems = view.findViewById(R.id.imageViewSyncItems);
            imageSyncItems.setOnClickListener(clickListener);
            return view;
        }

        private View.OnClickListener clickListener = new View.OnClickListener() {
            public void onClick(View v) {
                    if (v.equals(imageSyncItems)) {
                //If I uncomment the next line it does not work
                        //mCompositeDisposable.add(
                        mNumPages = 0;
                        syncProgress = 0;
                        showProgressDialog("Items");
                        getRecordsCount();
                       //); Closing round bracket for mCompositeDisposable
                }
            }
        };//End View.OnClickListener 

        private void getRecordsCount(){
            itemViewModel.getRecordsCount()
                    .subscribeOn(Schedulers.io())
                    .retry(3)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
        }

        private void HandleResults(ItemSyncDetails itemSyncDetails) {
            this.mItemSyncDetails = itemSyncDetails;
            int pageSize = 100;
            int numPages = itemSyncDetails.getRecordCount()/pageSize;
            if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
                numPages++;
            }
            this.mNumPages = numPages;
        }

        private void getNumPagesHandlerComplete() {
            getAllRecords(mNumPages);
        }

        private void handleError(Throwable throwable) {
            tvSyncDescriptionItems.setText("**********Error de conexión...");
            closeProgressDialog();
        }

        private void getAllRecords(int numPages){
            //numPages: total of pages are the number of times to send the request to API
            Observable.range(1, numPages)
                    .flatMap(i -> itemViewModel.getItemsPerPage(100,i))
                    .map(new Function<ItemSyncDetails, Integer>() {
                        @Override
                        public Integer apply(ItemSyncDetails itemSyncDetails) throws Throwable {
                            return itemViewModel.insertAllLocal(itemSyncDetails.getItemList()).size();
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
        }

        private void getAllHandleResults(Integer i) {
            progressDialog.setProgress(getProgress(i));
        }

        private void handleComplete() {
            //last request finished
            closeProgressDialog();
        }

        private int getProgress(int newItems){
            syncProgress += newItems;
            int progress = 0;
            if (syncProgress == mItemSyncDetails.getRecordCount()){
                progress = 100;
            } else {
                progress = (100 * syncProgress)/mItemSyncDetails.getRecordCount();
            }
            return progress;
        }
    ...
    }

http://192.168.1.10:82/api/v1.0/item?pageSize=1&currentPage=1&sortBy=1

注意:页面大小可能会改变,我使用的是 100 的固定大小 每页的项目。

{
  Results: [
  {
    epc: "202020202020202030303031",
    barcode: "0001",
    name: "Televisor Samnsung",
    description: "0001",
    creation_date: "2020-02-26T10:55:06",
    last_update: "2020-02-26T10:55:06",
    last_seen: "2020-02-26T10:55:06",
    brand: "Samnsung",
    serial_number: "0001",
    parent: "",
    fk_category: 1,
    responsable: "",
    purchase_date: "2020-02-26T10:55:06",
    cost: 0,
    fk_location: 1008,
    fk_item_state: 1,
    inventory_date: "2020-02-26T10:55:06"
  }
 ],
 CurrentPage: 1,
 PageCount: 65565,
 PageSize: 1,
 RecordCount: 65565
}

【问题讨论】:

  • 这里有很多代码。您能否只发布相关代码并删除不相关的代码?另外,您能否改写您要执行的操作?下面是一个例子:“下载一个包含 1000 个项目的列表,分页,当每页项目下载时,将它们保存在本地数据库中,并在保存所有页面后,更新视图”。
  • 嗨@sonnet,问题已编辑,谢谢。

标签: android retrofit2 android-room rx-java3


【解决方案1】:

您在编辑之前在此处发布了 json 响应。

    CurrentPage: 1,
    PageCount: 65566,
    PageSize: 1,
    RecordCount: 65566

如果我理解正确,那么您有 65k 个项目和每页 1 个项目。这意味着 65k 个页面,这意味着 65k 个网络调用。好多啊。你可以先改进这个设计。

  1. 将整个记录分成几页(甚至可能是 10 或 20 页)。如果整个记录有成千上万个项目,则 1 页仍将有数千个项目。
  2. 然后使用 gzip 压缩来压缩每个页面的 json 响应并从服务器提供。或者不要将记录分成页面,并在一个使用 gzip 压缩的响应中将它们全部传递(如果它不是那么大)。
  3. 在 android 上解压响应,解析它,然后做任何你想做的事情。

这样可以减少大量的网络调用,并可能减少同步的等待时间。

至于您的实际 rx 问题:

val pageSize = 100
viewModel.getRecordsCount()
    .map {
        // logic from `HandleResults` function
        // do some calculation
        var numPages: Int = it.records / pageSize
        if (it.records < pageSize || it.records % pageSize != 0) {
            numPages++
        }
        return@map numPages
    }
    .flatMap { pages -> Observable.range(1, pages) }
    .flatMap { page -> viewModel.getItemsPerPage(pageSize, page) }
    .flatMap { itemSyncDetails ->
        val items = viewModel.insertAllLocal(itemSyncDetails.getItemList())
        return@flatMap Observable.just(items.size)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(....)

我注意到并非 100% 的所有记录都被插入,每次我运行该进程时,插入的记录数量都会发生变化,大约是 80% - 98%,但从来没有 100%,即使所有的改造呼叫已发送。

handleError函数中记录错误,看看实际问题是什么。

【讨论】:

  • 感谢您重播@sonnet,我再次编辑问题并放入json,如果您看到url,还有一个参数pagesize可以更改页面大小,我正在做的是;我首先发送一个pagesize = 1的请求rxJava-retrofit,只是为了知道元素的总大小并计算pagesize = 100的总页数,然后我开始使用从1到total的范围发送rxJava-Retrofit请求Number Pages,然后在每次请求之后,我将从 API 获得的 List-of-Items 插入到 Room,我希望现在更清楚了。
  • 根据数据库错误,我使用的是 HTTP 记录器,但我没有在日志中看到任何错误,我唯一注意到的是我没有看到所有页面的所有 HTTP 请求,我也没有看到所有数据库插入日志,但没有看到错误,我不确定为什么会这样,我认为它无法处理所有日志,所以它跳过了其中一些,有什么想法吗?
  • 我做了一些基准测试。假设您的记录项目看起来像 json 中发布的项目并且您有 65k 项目,那么总 json 大小将为 31.85 MB。如果您对该文件应用 gzip 压缩,则它只有 139 KB。如果想法是将本地数据库与服务器同步,那么我建议您以压缩格式一次发送整个内容,然后解压缩,解析 json,然后插入列表
  • 谢谢你的重播,所以你说的是;在 API 回复服务器之前,它会 gzip json 响应,然后在设备上,它会获取响应,解压缩并将其保存到数据库,对吗?所以我的下一个问题是我从哪里开始?你能推荐一个教程来做到这一点吗?我正在使用服务器上的 API C#.Net 开发 Android-Java。
  • 你可能也想看看这个stackoverflow.com/questions/61741592/… 浏览android room 的文档,看看插入或批量插入是否有任何限制。这可能是数据库方面的问题。至于服务器,只需谷歌一些基本的使用 c# 的 gzip 压缩和使用 kotlin/java 的 gzip 解压缩和改造。网上有很多例子。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-10-04
  • 1970-01-01
  • 2021-03-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多