【问题标题】:rxjava2 - how to block until a call back completes and then do a retrofit call?rxjava2 - 如何阻塞直到回调完成然后进行改造调用?
【发布时间】:2018-01-05 05:09:13
【问题描述】:

IM 在使用 BlockingObservable 时遇到了难题。我有一种情况,我需要回调完成,完成后我可以进行改造 API 调用来获取数据。具体来说,我需要先初始化一个支付网关 sdk,然后在它成功初始化后,我将进行改造调用。这是我目前所拥有的:

  Observable.fromCallable(new Callable<PaymentStrategy>() {

                    @Override
                    public PaymentStrategy call() throws Exception {
                        return gatewayFactory.getPaymentStrategy("US");
                    }}).flatMap(new Function<PaymentStrategy, ObservableSource<PaymentStrategy>>() {
                    @Override
                    public ObservableSource<PaymentStrategy> apply(@NonNull final PaymentStrategy paymentStrategy) throws Exception {
                        return Observable.fromCallable(new Callable<PaymentStrategy>() {
                            @Override
                            public PaymentStrategy call() throws Exception {


        /*here is important. i want it to block until init actually
     gets a call back. when it does the subscriber will call
 onComplete and the observable should move forward at that point*/


                                paymentStrategy.init(paymentInitSubscriber);
                                return paymentStrategy;
                            }
                        });
                    }}).observeOn(AndroidSchedulers.mainThread())
                        .subscribeOn(AndroidSchedulers.mainThread())
                        .subscribe(paymentInitSubscriber);

似乎 rxjava2 没有 toBlocking() 调用,但我确实找到了 toBlockingFirst() 等和 BlockingObservable 类。但我仍然不确定如何完成任务。所以要清楚,当我调用 paymentStrategy.init() 时,我需要 observable 阻塞,直到订阅者 onComplete 或 onNext 被调用。我将订阅者作为参数传递,以便回调知道在完成时调用它。有什么想法吗?

【问题讨论】:

    标签: rx-java rx-java2


    【解决方案1】:

    我发现我最好使用 Flowable 并为其提供发射器。然后我可以发出 onNext 和 onComplete 等事件。

    final PaymentStrategy paymentStrategy = gatewayFactory.getPaymentStrategy("US");
    
                FlowableOnSubscribe flowableOnSubscribe = new FlowableOnSubscribe() {
                    @Override
                    public void subscribe(FlowableEmitter e) throws Exception {
                        FlowableEmitter initdownFlowableEmitter = e;
                        paymentStrategy.init(e);
        /* above i pass in the emitter and i can call onNext when the call back i want completes. this pushes the stream forward to the next one below of the retrofit call */
                    }
                };
    
                final Flowable flowable = Flowable.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
                return flowable.flatMap(new Function() {
                    @Override
                    public Object apply(@NonNull Object o) throws Exception {
                        PaymentApi service = mRetrofit.create(PaymentApi.class);
                        return service.getCards();
                    }
                }).toObservable();
    

    并且不要忘记在改造类中使用 Flowable 而不是 observable。

    根据 github 注释 here,似乎 fromAsync 已重命名为 Flowable.create:

    实际上,1.x 的 fromEmitter(以前的 fromAsync)已重命名为 Flowable.create。

    【讨论】:

      猜你喜欢
      • 2016-05-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-30
      • 2010-12-04
      相关资源
      最近更新 更多