【问题标题】:Multiple android api calls using RxJava使用 RxJava 的多个 android api 调用
【发布时间】:2026-02-15 17:25:03
【问题描述】:

我有一个对象列表,这些对象在每个 api 调用中作为 post 请求的主体发送。所有发布请求都应该并行运行。对于每个请求,它应该通知它是成功还是失败,并且一旦所有发布请求完成,它应该通知。如何使用 rxjava 实现这一目标

请浏览此代码。这是实现吗?

    FlashSaleRetrofitService retrofitService = flashSaleProvider.createRetrofitService();
    FlashSale runningFlashSale = JumkeyApplication.getRunningFlashSale();

    List<Observable<Response<ClaimWrapper>>> wrapperResponseList = new ArrayList<>();

    for (final CartProduct cartProduct : cartProducts) {

        final Claim claim = new Claim();

        Customer customer = customerDataProvider.getCustomer();
        claim.setCustomerId(String.valueOf(customer.getId()));
        claim.setEmail(customer.getEmail());
        claim.setMobileNumber(customer.getDefaultAddress().getPhone());
        claim.setProductId(Long.parseLong(cartProduct.getProductId()));
        claim.setVariantId(Long.parseLong(cartProduct.getVariantId()));
        claim.setQuantity(cartProduct.getQuantity());
        claim.setSaleId(runningFlashSale.getSaleId());

        wrapperResponseList.add(retrofitService.claimProduct(claim));
    }

    Observable.zip(wrapperResponseList, new FuncN<List<Claim>>() {
        @Override
        public List<Claim> call(Object... args) {
            return null;
        }
    }).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Action1<List<Claim>>() {
                @Override
                public void call(List<Claim> claims) {

                }
            })
            .compose(new Observable.Transformer<List<Claim>, Claim>() {
                @Override
                public Observable<Claim> call(Observable<List<Claim>> listObservable) {
                    return null;
                }
            })
            .subscribe(new Observer<Claim>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Claim claim) {

                }
            });

如果这不是正确的方法,请建议我如何解决上述问题。

【问题讨论】:

    标签: android networking retrofit rx-java


    【解决方案1】:

    您正在寻找Zip 方法。

    请参阅 RxJava documentation

    【讨论】:

    • 谢谢,我去看看文档
    【解决方案2】:

    如果您想要所有完成后的处理结果。

    Observable
            .zip(observables, new FuncN<List<Response<ClaimWrapper>>>() {
                @Override
                public List<String> call(Object... args) {
                    List<Response<ClaimWrapper>> result = new ArrayList<>(args.length);
    
                    for (Object arg : args) {
                        result.add((Response<ClaimWrapper>) arg);
                    }
                    return result;
                }
            })
            .subscribe(new Subscriber<List<Response<ClaimWrapper>>>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(List<Response<ClaimWrapper>> results) {
    
                }
            });
    

    如果你想一个一个的处理

    Observable
            .merge(observables)
            .subscribe(new Subscriber<Response<ClaimWrapper>>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Response<ClaimWrapper> result) {
    
                }
            });
    

    【讨论】:

    • 感谢您的帮助。这样做的一个问题是,如果一个请求失败,剩余的请求不会执行。即使先前的请求失败,如何让它们执行。
    • 我很努力,但没有得到预期的结果。请任何人帮助我解决这个问题。
    • onError 只能调用一次。所以组合可观察中的每个错误都会破坏所有链。如果发生故障,接收其他请求的唯一方法是使用 onErrorResumeNext() 或 onErrorReturn() 暂停错误