【问题标题】:Queuing tasks with RxJava in Android在 Android 中使用 RxJava 对任务进行排队
【发布时间】:2016-08-01 11:34:37
【问题描述】:

我正在开发具有后台数据同步功能的 Android 应用程序。 我目前正在使用 RxJava 定期在服务器上发布一些数据。 除此之外,我想为用户提供一个“强制同步”按钮,它将立即触发同步。 我知道如何使用Observable.interval() 以固定的时间间隔推送数据,并且我知道如何使用Observalbe.just() 推送强制的数据,但如果发生这种情况,我想将它们排队,同时触发前一个仍在运行。

因此,让我们以 1 分钟为自动同步间隔为例,假设同步持续 40 秒(我在这里夸大其词只是为了更容易理解)。现在,如果有任何机会,用户在自动仍在运行时按下“强制”按钮(反之亦然 - 当强制仍在运行时自动触发),我想将第二个同步请求排队等待第一个结束。

我已经画了这张图片,这可能会给它带来更多的视角:

如您所见,自动触发(由一些Observable.interval()),在同步过程中,用户按下“强制”按钮。现在我们要等待第一个请求完成,然后重新开始强制请求。 在某一时刻,当强制请求运行时,新的自动请求再次被触发,刚刚将其添加到队列中。在队列中的最后一个完成后,一切都停止了,然后稍后再次安排自动。

希望有人能指出我正确的操作员如何做到这一点。我已经尝试使用Observable.combineLatest(),但队列列表在开始时已分派,当我将新同步添加到队列时,它在上一个操作完成时并没有继续。

非常感谢任何帮助, 达科

【问题讨论】:

    标签: android rx-java rx-android


    【解决方案1】:

    虽然有一个很好的解决方案的公认答案,但我想分享另一个使用 Scheduler 和 SingleThreadExecutor 执行此操作的选项

    public static void main(String[] args) throws Exception {
        System.out.println(" init ");
        Observable<Long> numberObservable =
                Observable.interval(700, TimeUnit.MILLISECONDS).take(10);
    
        final Subject subject = PublishSubject.create();
    
        Executor executor = Executors.newSingleThreadExecutor();
        Scheduler scheduler = Schedulers.from(executor);
        numberObservable.observeOn(scheduler).subscribe(subject);
    
        subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
                        onCompleteFunc("subscriber 1"));
    
        Thread.sleep(800);
        //simulate action
        executor.execute(new Runnable() {
            @Override
            public void run() {
                subject.onNext(333l);
            }
        });
    
        Thread.sleep(5000);
    }
    
    static Action1<Long> onNextFunc(final String who) {
        return new Action1<Long>() {
            public void call(Long x) {
                System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
                        + " -- " + System.currentTimeMillis());
                try {
                    //simulate some work
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }
    
    static Action1<Throwable> onErrorFunc(final String who) {
        return new Action1<Throwable>() {
            public void call(Throwable t) {
                t.printStackTrace();
            }
        };
    }
    
    static Action0 onCompleteFunc(final String who) {
        return new Action0() {
            public void call() {
                System.out.println(who + " complete");
            }
        };
    }
    

    【讨论】:

      【解决方案2】:

      您可以通过将计时器与单击按钮Observable/Subject 合并来实现此目的,使用onBackpressureBuffer 的排队效果并将处理concatMap 到其中,以确保一次运行一个。

      PublishSubject<Long> subject = PublishSubject.create();
      
      Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);
      
      periodic.mergeWith(subject)
      .onBackpressureBuffer()
      .concatMap(new Func1<Long, Observable<Integer>>() {
          @Override
          public Observable<Integer> call(Long v) {
              // simulates the task to run
              return Observable.just(1)
                      .delay(300, TimeUnit.MILLISECONDS);
          }
      }
      ).subscribe(System.out::println, Throwable::printStackTrace);
      
      Thread.sleep(1100);
      // user clicks a button
      subject.onNext(-1L);
      
      Thread.sleep(800);
      

      【讨论】:

      • 好的,这是一个很好的方向,但我有几个问题......匿名函数是怎么回事?我没有使用 Java 8,因为我必须为 API 级别 16 进行开发。另外,哪一行模拟按钮按下?
      • lambda 表示同步应该发生的位置 - 以可观察的形式。 subject.onNext() 是用户点击按钮的地方。
      • 如果你能帮助我,再添加一个问题。所以我按照@akarnokd 的建议做了这个,一切正常,正是我需要的。现在我正在尝试在链完成后拨打onComplete() 电话。如果你看上面的图片,第一个链将在第三个同步过程之后结束。这是因为也许我想在链结束后而不是每次同步后更新 UI。
      • 如果您的任务产生单个值,您可以在订阅服务器或 Action1 中更新 UI(而不是 System.out::println)。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-24
      • 1970-01-01
      • 2015-07-08
      • 2015-10-05
      相关资源
      最近更新 更多