【问题标题】:RxJava. Sequential executionRxJava。顺序执行
【发布时间】:2017-11-13 12:59:48
【问题描述】:

在我的 Android 应用程序中,我有一个处理用户交互的演示者,包含一种请求管理器,如果需要,可以通过请求管理器将用户输入发送到请求管理器。

请求管理器本身包含服务器 API 并使用此 RxJava 处理服务器请求。

我有一个代码,每次用户输入消息时都会向服务器发送一个请求并显示来自服务器的响应:

private Observable<List<Answer>> sendRequest(String request) {
    MyRequest request = new MyRequest();
    request.setInput(request);
    return Observable.fromCallable(() -> serverApi.process(request))
            .doOnNext(myResponse -> {
                // store some data
            })
            .map(MyResponse::getAnswers)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
}

但是现在我需要排队。用户可以在服务器响应之前发送一条新消息。队列中的每条消息都应按顺序处理。 IE。第二条消息将在我们收到第一条消息的响应后发送,以此类推。

如果发生错误,则不应处理进一步的请求。

我还需要在 RecyclerView 中显示答案。

我不知道如何更改上面的代码以实现上述处理

我看到了一些问题。一方面,用户可以随时更新此队列,另一方面,无论何时服务器发送响应消息都应从队列中删除。

也许我刚刚错过了一个 rxjava 运算符或特殊方式。

我在这里看到了类似的答案,但是,那里的“队列”是不变的。 Making N sequential api calls using RxJava and Retrofit

我会非常感谢任何解决方案或链接

【问题讨论】:

    标签: android rx-java rx-java2


    【解决方案1】:

    我没有找到任何优雅的原生 RxJava 解决方案。所以我会定制一个Subscriber 来做你的工作。

    为了你的 3 分:

    1. 对于顺序执行,我们创建一个单线程调度器

      Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));

    2. 为了在发生错误时停止所有请求,我们应该一起订阅所有请求,而不是每次都创建一个Flowable。所以我们定义了以下函数(这里我请求的是Integer和响应String):

      void sendRequest(Integer request)

      Flowable&lt;String&gt; reciveResponse()

      并定义一个字段来关联请求和响应流:

      FlowableProcessor&lt;Integer&gt; requestQueue = UnicastProcessor.create();

    3. 为了重新运行未发送的请求,我们定义了重新运行函数:

      void rerun()

    那么我们就可以使用它了:

    reciveResponse().subscribe(/**your subscriber**/)
    

    现在让我们实现它们。

    发送请求时,我们只是将其推送到requestQueue

    public void sendRequest(Integer request) {
      requestQueue.onNext(request);
    }
    

    首先,要按顺序执行请求,我们应该将工作安排到sequential

    requestQueue
      .observeOn(sequential)
      .map(i -> mockLongTimeRequest(i)) // mock for your serverApi.process
      .observeOn(AndroidSchedulers.mainThread());
    

    其次,在发生错误时停止请求。这是一种默认行为。如果我们什么都不做,一个错误将破坏订阅,并且不会发出任何其他项目。

    第三,重新运行未发送的请求。首先是因为原生操作符会取消流,比如MapSubscriber do (RxJava-2.1.0-FlowableMap#63):

    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);// fail will call cancel
        return;
    }
    

    我们应该包装错误。这里我使用我的Try 类来包装可能的异常,您可以使用任何其他可以包装异常的实现而不是抛出它:

        .map(i -> Try.to(() -> mockLongTimeRequest(i)))
    

    然后是自定义的OnErrorStopSubscriber implements Subscriber&lt;Try&lt;T&gt;&gt;, Subscription

    它正常请求并发出项目。当错误发生时(实际上是一个失败的Try 发出)它停在那里,甚至不会请求或发出下游请求。调用rerun 方法后,它会回到运行状态并正常发出。这门课大约80行。你可以在my github看到代码。

    现在我们可以测试我们的代码了:

    public static void main(String[] args) throws InterruptedException {
      Q47264933 q = new Q47264933();
      IntStream.range(1, 10).forEach(i -> q.sendRequest(i));// emit 1 to 10
      q.reciveResponse().subscribe(e -> System.out.println("\tdo for: " + e));
      Thread.sleep(10000);
      q.rerun(); // re-run after 10s
      Thread.sleep(10000);// wait for it complete because the worker thread is deamon
    }
    
    private String mockLongTimeRequest(int i) {
      Thread.sleep((long) (1000 * Math.random()));
      if (i == 5) {
        throw new RuntimeException(); // error occur when request 5
      }
      return Integer.toString(i);
    }
    

    和输出:

    1 start at:129
    1 done  at:948
    2 start at:950
        do for: 1
    2 done  at:1383
    3 start at:1383
        do for: 2
    3 done  at:1778
    4 start at:1778
        do for: 3
    4 done  at:2397
    5 start at:2397
        do for: 4
    error happen: java.lang.RuntimeException
    6 start at:10129
    6 done  at:10253
    7 start at:10253
        do for: 6
    7 done  at:10415
    8 start at:10415
        do for: 7
    8 done  at:10874
    9 start at:10874
        do for: 8
    9 done  at:11544
        do for: 9
    

    您可以看到它按顺序运行。并在发生错误时停止。调用rerun方法后,继续处理剩下的未发送请求。

    For complete code, see my github.

    【讨论】:

    • 感谢您的回答。我想,使用 Scheduler.single() 也可以吗?我的问题有两个要求:1.如果发生任何错误,停止所有请求。 2. 重新运行所有未发送的请求
    • @Tima 是的,但Schedulers.single 是全球共享的。对于这两个要求,将Observable sendRequest(request) 拆分为void sendRequest(request)Observable reciveResponse() 怎么样?我认为订阅逻辑是相似的。
    • 谢谢。我找到了一种带有一点数据结构操作的原生解决方案。我必须测试它,稍后会发布它
    【解决方案2】:

    对于这种行为,我正在使用 Flowable 背压实现。 为您的 api 请求流创建作为父级的外部流,使用 maxConcurrency = 1 对 api 请求进行平面映射并实施某种缓冲策略,因此您的 Flowable 不会抛出异常。

    Flowable.create(emitter -> {/* user input stream*/}, BackpressureStrategy.BUFFER)
                    .onBackpressureBuffer(127, // buffer size
                            () -> {/* overflow action*/},
                            BackpressureOverflowStrategy.DROP_LATEST) // action when buffer exceeds 127
                    .flatMap(request -> sendRequest(request), 1) // very important parameter
                    .subscribe(results -> {
                        // work with results
                    }, error -> {
                        // work with errors
                    });
    

    它将用户输入缓冲到给定的阈值,然后将其丢弃(如果不这样做会抛出异常,但用户不太可能超过这样的缓冲区),它将按顺序执行 1 x 1像一个队列。如果库本身有某种行为的运算符,请不要尝试自己实现此行为。

    哦,我忘了说,你的sendRequest() 方法必须返回 Flowable 或者你可以将它转换为 Flowable。

    希望这会有所帮助!

    【讨论】:

    • 感谢您的回答。你能再解释一下,什么是用户输入流?
    • 您通过某种事件启动 sendRequest,我假设此事件是用户输入,例如单击某些内容或在 EditText 中键入文本。在我的示例中,用户输入流或父流只不过是此类事件流经的流。您需要该父流才能按需要工作,否则您无法控制事件流顺序。
    • 你可以简单地在 Flowable.create 块内设置 TextWatcher,当文本改变时调用emitter.onNext()
    • 好的。我会尽力。谢谢
    • 刚刚修改了我的问题。我不确定我是否仍然可以通过您的解决方案解决我的问题
    【解决方案3】:

    我的解决方案如下(我之前在 Swift 中做过类似的事情):

    1. 您需要一个用于请求和响应的包装接口(我们称之为“事件”)。
    2. 您将需要一个包含请求队列和最新服务器响应的状态对象(我们将其设为“State”类),以及一个接受“Event”作为参数并返回“this”的方法。
    3. 您的主要处理链看起来像 Observable state = Observable.merge(serverResponsesMappedToEventObservable, requestsMappedToEventObservable).scan(new State(), (state, event) -> { state.apply(event) })时间>
    4. .merge() 方法的两个参数都可能是 Subjects。
    5. 队列处理将在“状态”对象的唯一方法中进行(在任何事件上从队列中挑选和发送请求,在请求事件上添加到队列中,在响应事件上更新最新响应)。

    【讨论】:

      【解决方案4】:

      我建议创建异步可观察方法,这里是一个示例:

      public Observable<Integer> sendRequest(int x){
          return Observable.defer(() -> {
              System.out.println("Sending Request : you get Here X ");
              return storeYourData(x);
          });
      }
      
      public Observable<Integer> storeYourData(int x){
          return Observable.defer(() -> {
              System.out.println("X Stored : "+x);
              return readAnswers(x);
          }).doOnError(this::handlingStoreErrors);
      }
      
      public Observable<Integer> readAnswers(int h){
          return Observable.just(h);
      }
      
      public void handlingStoreErrors(Throwable throwable){
              //Handle Your Exception.
      }
      

      第一个 observable 将发送请求,当他得到响应时将继续第二个,你可以链接,你可以自定义每个方法来处理错误或成功,这个示例就像队列。

      这里是执行结果:

      for (int i = 0; i < 1000; i++) {
              rx.sendRequest(i).subscribe(integer -> System.out.println(integer));
      
      }
      Sending Request : you get Here X 
      X Stored : 0
      0
      Sending Request : you get Here X 
      X Stored : 1
      1
      Sending Request : you get Here X 
      X Stored : 2
      2
      Sending Request : you get Here X 
      X Stored : 3
      3
      .
      .
      .
      Sending Request : you get Here X 
      X Stored : 996
      996
      Sending Request : you get Here X 
      X Stored : 997
      997
      Sending Request : you get Here X 
      X Stored : 998
      998
      Sending Request : you get Here X 
      X Stored : 999
      999
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-11-23
        • 2018-08-12
        • 2013-11-12
        • 2012-04-10
        相关资源
        最近更新 更多