【问题标题】:How do I place Asynchronous Retrofit calls using rxjava. I have to place over a 100 calls asynchronously如何使用 rxjava 进行异步改造调用。我必须异步拨打 100 多个电话
【发布时间】:2020-08-17 07:09:58
【问题描述】:

这是我一直在处理的代码示例

items 包含 100 个元素,因此使用同步调用获取数据会占用大量时间。有人可以建议一种方法来提高此操作的速度,从而减少时间。 目前这需要 15-20 秒来执行。我是 rxjava 的新手,因此请尽可能提供此问题的详细解决方案。 dataResponses 包含 100 个项目中每个项目的 RouteDistance 对象。

for(int i = 0 ; i<items.size();i++){

    Map<String, String> map2 = new HashMap<>();

    map2.put("units", "metric");
    map2.put("origin", currentLocation.getLatitude()+","+currentLocation.getLongitude());
    map2.put("destination", items.get(i).getPosition().get(0)+","+items.get(i).getPosition().get(1));
    map2.put("transportMode", "car");
    requests.add(RetrofitClient4_RouteDist.getClient().getRouteDist(map2));
}

Observable.zip(requests,  new Function<Object[], List<RouteDist>>() {
    @Override
    public List<RouteDist> apply(Object[] objects) throws Exception {
        Log.i("onSubscribe", "apply: " + objects.length);
        List<RouteDist> dataaResponses = new ArrayList<>();
        for (Object o : objects) {
            dataaResponses.add((RouteDist) o);
        }
        return dataaResponses;
    }
})
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(
                new Consumer<List<RouteDist>>() {
                    @Override
                    public void accept(List<RouteDist> dataaResponses) throws Exception {
                        Log.i("onSubscribe", "YOUR DATA IS HERE: "+dataaResponses.toString());
                        recyclerViewAdapter_profile = new RecyclerViewAdapter_Profile(items,dataaResponses);
                        recyclerView.setAdapter(recyclerViewAdapter_profile);
                    }
                },

                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable e) throws Exception {
                        Log.e("onSubscribe", "Throwable: " + e);
                    }
                });

【问题讨论】:

  • 如果 rxjava 没有帮助,你为什么要使用它?您不能只进行并行改造电话吗?
  • 对不起@AlexeiKaigorodov,你能解释一下你的评论有什么帮助吗?您是否考虑过问题创建者是 RxJava 的新手并且不完全了解其所有运算符的情况?如果您有其他解决方案如何解决此问题,请提供答案。
  • 我的意思是,进行异步调用是一项任务,而使用 rxjava 则完全是另一项任务。此任务需要不同的问题。我建议先解决第一个任务,然后询问如何将工作代码转换为 rxjava,希望第二个问题根本不会被问到,因为一切都已经工作了。

标签: java android rx-java retrofit2 rx-java2


【解决方案1】:

API

interface Client {
    Observable<RouteDist> routeDist();
}

final class RouteDist {

}


final class ClientImpl implements Client {
    @Override
    public Observable<RouteDist> routeDist() {
        return Observable.fromCallable(() -> {
            // with this log, you see, that each subscription to an Observable is executed on the ThreadPool
            // Log.e("---------------------", Thread.currentThread().getName());
            return new RouteDist();
        });
    }
}

通过 subscribeOn 应用线程

final class ClientProxy implements Client {
    private final Client api;
    private final Scheduler scheduler;

    ClientProxy(Client api, Scheduler scheduler) {
        this.api = api;
        this.scheduler = scheduler;
    }

    @Override
    public Observable<RouteDist> routeDist() {
        // apply #subscribeOn in order to move subscribeAcutal call on given Scheduler
        return api.routeDist().subscribeOn(scheduler);
    }
}

安卓测试

@Test
public void name() {
    // CachedThreadPool, in order to avoid creating 100-Threads or more. It is always a good idea to use own Schedulers (e.g. Testing)
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 10,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    // wrap real client with Proxy, in order to move the subscribeActual call to the ThreadPool
    Client client = new ClientProxy(new ClientImpl(), Schedulers.from(threadPool));

    List<Observable<RouteDist>> observables = Arrays.asList(client.routeDist(), client.routeDist(), client.routeDist());

    TestObserver<List<RouteDist>> test = Observable.zip(observables, objects -> {
        return Arrays.stream(objects).map(t -> (RouteDist) t).collect(Collectors.toList());
    })
            .observeOn(AndroidSchedulers.mainThread())
            .test();

    test.awaitCount(1);

    // verify that onNext in subscribe is called in Android-EventLoop
    assertThat(test.lastThread()).isEqualTo(Looper.getMainLooper().getThread());
    // verify that 3 calls were made and merged into one List
    test.assertValueAt(0, routeDists -> {
        assertThat(routeDists).hasSize(3);
        return true;
    });
}

进一步阅读:

http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

注意: 不建议一次同时调用 API 100 次。此外,当使用 Zip 时,当你有一个足够大的 ThreadPool 时,这实际上会发生。当一个 API 调用超时时,可能会为此 API 调用发出一个 onError。 onError 将进一步传播给订阅者。即使只有 API 调用失败,您也不会得到任何结果。建议使用一些 onErrorResumeNext 或其他一些错误处理运算符,以确保一次 API 调用不会取消整体结果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-03-24
    • 1970-01-01
    • 1970-01-01
    • 2016-02-08
    • 1970-01-01
    • 2016-08-12
    • 1970-01-01
    • 2014-12-09
    相关资源
    最近更新 更多