【问题标题】:RxJS Service Call Throtting / QueuingRxJS 服务调用限制/排队
【发布时间】:2016-05-10 13:24:47
【问题描述】:

我正在尝试使用 RxJS 来实现服务调用节流/排队。

例如,谷歌地图的地理编码器 API。假设我不希望每秒调用一次以上,但我的应用程序的一个或多个部分可能会比​​这更频繁地请求地理编码。我希望请求排队,相邻的请求相隔至少 1s,但如果在此等待期间不再需要请求,我也希望能够“取消”请求。

这是 RxJS 的适用用途吗?如果是,它会是什么样子?

谢谢。

【问题讨论】:

  • 是的,这是 Rxjs 的适用用法。您应该能够找到一种使用 Rxjs 运算符的方法,例如 scansample、也许是 bufferWithTime 等等。当您有工作代码时,请不要犹豫将其发布在这里。其他有同样担忧的人会很感激。如果您无法获得有效的代码,请发回您的问题。
  • 多谢指点,我看看能不能整理出来,在这里分享一下。
  • 这也是一本很好的读物:github.com/Reactive-Extensions/RxJS/blob/master/doc/…controlled 运算符允许您管理请求队列,并在您想要的时间提取它们。

标签: rxjs


【解决方案1】:

这里有一些应该指导你的东西 (jsfiddle):

// Helper functions
function remove_from_queue(queue, id) {
  queue.forEach(function(x, index){
    if (x.execute.request === id) {
      queue.splice(index, 1);
    }
  });
//  console.log('queue after removal', queue);
}

function add_to_queue (queue, id){
  queue.push({execute : {request : id}});
}

function getFirstInQueue(queue){
  return queue[0];
}

function noop(x) {}

function log(label) {
  return function (x) {
    console.log.call(console, label, x);
  }
}

function timestamp(label){
  return function (x) {
    console.log.call(console, Date.now() - startingDate, label,x );
  }
}

function label(label){
  return function (x) {
    var res = {};
    res[label] = x;
    return res;
  }
}

var startingDate = Date.now();

var requests$ = Rx.Observable.generateWithRelativeTime(
  {request : 1},
  function (x) { return x.request < 10; },
  function (x) { return {request : x.request + 1}; },
  function (x) { return x; },
  function (x) { return 100 ; }
);

var cancelledRequests$ = Rx.Observable.generateWithRelativeTime(
  {request : 1},
  function (x) { return x.request < 20; },
  function (x) { return {request : x.request + 4}; },
  function (x) { return x; },
  function (x) { return 500 ; }
);

var timer$ = Rx.Observable.interval(990).map(function (){return {}}).take(10);

var source$ = Rx.Observable.merge(
  requests$.map(label('execute')),
  cancelledRequests$.map(label('cancel')),
  timer$
)
//.do(log('source'));

controlledSource$ = source$
  .scan(function (state, command){
    var requestsToExecuteQueue = state.requestsToExecuteQueue;
    if (command.cancel) {
      remove_from_queue(requestsToExecuteQueue, command.cancel.request);
    }
    if (command.execute) {
      add_to_queue(requestsToExecuteQueue, command.execute.request);
    }
    console.log('queue', requestsToExecuteQueue.slice())

    return {
      command : command,
      requestExec$ : Rx.Observable
        .return(getFirstInQueue(requestsToExecuteQueue))
        .filter(function(x){return x})
        .do(function(x){remove_from_queue(requestsToExecuteQueue, x.execute.request)}),
      requestsToExecuteQueue : requestsToExecuteQueue
    }
  }, {command : undefined, requestExec$ : undefined, requestsToExecuteQueue : []})
  .pluck('requestExec$')
  .sample(Rx.Observable.interval(1000))
  .mergeAll();

controlledSource$.do(timestamp('executing request:')).subscribe(noop)

基本上:

  • scan 用于管理状态(请求队列、添加和删除)
  • 对于每个请求,我们传递一个 observable(当订阅时)释放队列的第一个元素,并从队列中删除该元素
  • sample 用于每秒获得一个这样的 observable
  • mergeAll 允许订阅该 observable
  • 我们必须使用timer$ 对象来继续轮询队列,即使请求源已经完成(您仍然需要清空剩余请求的队列)。您可以通过在源代码完成后让 timer$ 发射 X 秒或任何最适合您的方式来使该逻辑适应您的实际情况。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-24
    • 1970-01-01
    • 2016-04-29
    相关资源
    最近更新 更多