【问题标题】:Implementing a rate limited search results using `Observable`s使用 `Observable` 实现速率限制搜索结果
【发布时间】:2017-05-15 09:20:15
【问题描述】:

假设我有两个Observables:
obs1 在搜索框中发出用户输入的结果,
obs2 将字符串作为输入并发起 HTTP 请求,然后提供结果

现在,我想限制 HTTP 请求的数量,而不是通过某个恒定的时间间隔,而是取决于 obs2 何时完成当前请求,如下所示:

  1. 用户类型tobs2 立即以t 开始请求
  2. 用户类型teobs2 仍然“忙碌”,没有任何反应
  3. 用户类型tesobs2 仍然“忙碌”,没有任何反应
  4. 用户类型testobs2 仍然“忙”,没有任何反应
  5. t-HTTP 响应已到达,obs2 现在“免费”,它查看 obs1 最后发出的值并找到 test,开始一个新请求
  6. test-HTTP 响应已到达,obs2 现在“免费”,它查看 obs1 最后发出的值并找到 test,因为该值没有改变,所以什么也不做。

我可以通过引入指示系统状态和搜索查询累加器的附加变量来做到这一点,但我想知道这是否可以以纯粹的功能方式完成,即单独使用 rxJava 方法?

【问题讨论】:

  • 您知道这可能会导致近 2 倍的延迟延迟吗?发出多个 Web 请求以避免最终用户的长时间延迟不是更好吗?

标签: rx-java system.reactive


【解决方案1】:

查看代码和注释。

import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import xdean.jex.extra.Pair;

public class Q43975663 {
  public static void main(String[] args) throws InterruptedException {
    PublishSubject<String> textSub = PublishSubject.create(); // emit user input text
    PublishSubject<String> taskSub = PublishSubject.create(); // emit when execution thread is free
    // core
    Observable
        // when new text input or execution thread change to free, emit an item
        .combineLatest(textSub.distinctUntilChanged(), taskSub, Pair::of)
        // if the text not change or task cycle not change, ignore it
        .scan((p1, p2) ->
            (p1.getLeft().equals(p2.getLeft()) || p1.getRight().equals(p2.getRight())) ?
                p1 : p2)
        .distinctUntilChanged()
        // map to user input text
        .map(Pair::getLeft)
        // scheduler to IO thread
        .observeOn(Schedulers.io())
        // do HTTP request
        .doOnNext(Q43975663::httpTask)
        // emit item to notify the execution thread is free
        .doOnNext(taskSub::onNext)
        .subscribe();
    // test
    taskSub.onNext("start");
    textSub.onNext("t");
    textSub.onNext("te");
    textSub.onNext("tex");
    textSub.onNext("text");
    Thread.sleep(5000);
    textSub.onNext("new");
    textSub.onNext("new");
    textSub.onNext("text");
    Thread.sleep(5000);
  }

  static void httpTask(String id) {
    System.out.printf("%s \tstart on \t%s\n", id, Thread.currentThread());
    try {
      Thread.sleep((long) (Math.random() * 1000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.printf("%s \tdone on \t%s\n", id, Thread.currentThread());
  }
}

注意Pair 是一个简单的类,有两个值,left 和 right。

输出:

t       start on    Thread[RxIoScheduler-2,5,main]
t       done on     Thread[RxIoScheduler-2,5,main]
text    start on    Thread[RxIoScheduler-2,5,main]
text    done on     Thread[RxIoScheduler-2,5,main]
new     start on    Thread[RxIoScheduler-2,5,main]
new     done on     Thread[RxIoScheduler-2,5,main]
text    start on    Thread[RxIoScheduler-2,5,main]
text    done on     Thread[RxIoScheduler-2,5,main]

【讨论】:

    猜你喜欢
    • 2014-12-20
    • 1970-01-01
    • 1970-01-01
    • 2012-03-08
    • 1970-01-01
    • 2015-03-30
    • 2016-10-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多