【问题标题】:rxjs periodic polling of an endpoint with a variable response timerxjs 定期轮询具有可变响应时间的端点
【发布时间】:2018-01-11 17:26:06
【问题描述】:

我希望轮询端点不快于每秒一次,并且不慢于轮询端点所需的时间。决不能有超过一个未完成的请求。

我想要一种响应式编程方式来至少每秒轮询一次端点,但如果端点花费的时间超过 1 秒,则下一个请求会立即触发。

在下面的弹珠图中,第 2 次和第 3 次请求花费的时间超过 1 秒,但第 4 次和第 5 次请求完成得更快。下一个请求在 1 秒边界触发,或者在从最后一个未完成的请求中获取数据后立即触发。

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events

我正在尝试在大理石图中正确使用术语,所以我 假设端点请求的开始应该是 大理石我标记为“r”,大理石事件我标记为“d”有端点数据。

这是我用纯 js 完成此操作所需的代码量;但是,正如我上面所要求的那样,后续请求不会在获得后立即触发。

var poll;
var previousData;
var isPolling = false;
var dashboardUrl = 'gui/metrics/dashboard';
var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

return {
    startInterval: startInterval,
    stopInterval: stopInterval
};

function startInterval() {
    stopInterval();
    tryPolling(); // immediately hit the dashboard
    // attempt polling at the interval
    poll = $interval(tryPolling, intervalMs);
}

/**
 * attempt polling as long as there is no in-flight request
 * once the in-flight request completes or fails, allow the next request to be processed
 */
function tryPolling() {
    if (!isPolling) {
        isPolling = true;

        getDashboard()
        // if the dashboard either returns successful or fails, reset the polling boolean
            .then(resetPolling, resetPolling);
    }
}

/** there's no longer an in-flight request, so reset the polling boolean */
function resetPolling() {
    isPolling = false;
}

function stopInterval() {
    if (poll) {
        $interval.cancel(poll);
        poll = undefined;
    }
}

function getDashboard() {
    return restfulService.get(dashboardUrl)
        .then(updateDashboard);
}

function updateDashboard(data) {
    if (!utils.deepEqual(data, previousData)) {
        previousData = angular.copy(data);
        $rootScope.$broadcast('$dashboardLoaded', data);
    }
}

【问题讨论】:

    标签: rxjs observable


    【解决方案1】:

    这是我的解决方案。它使用内部主题 combineLatestfilter 来确保在响应到达速度比 timer 周期慢时不会累积请求。

    cmets 应该解释它是如何工作的。

    const delays = [100, 2000, 100, 3000];
    const since = Date.now();
    let index = 0;
    
    function mock() {
        return Rx.Observable
        .of("res")
        .do(() => console.log("mock req at ", Date.now() - since, " ms"))
        .delay(delays[index++ % delays.length])
        .do(() => console.log("mock res at ", Date.now() - since, " ms"));
    }
    
    function poll() {
    
      return Rx.Observable.defer(() => {
    
        // Use defer so that the internal subject is created for each
        // subscription.
        const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });
    
        return Rx.Observable
        
          // Combine the timer and the subject's state.
          .combineLatest(
            Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
            subject
          )
    
          // Filter out combinations in which either a more recent tick
          // has not occurred or a request is pending.
          .filter(([tick, state]) => (tick !== state.tick) && !state.pending)
    
          // Update the subject's state.
          .do(([tick]) => subject.next({ tick, pending: true }))
          
          // Make the request and use the result selector to combine
          // the tick and the response.
          .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])
    
          // Update the subject's state.
          .do(([tick]) => subject.next({ tick, pending: false }))
          
          // Map the response.
          .map(([tick, resp]) => resp);
      });
    }
    
    poll().take(delays.length).subscribe(r => console.log(r));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

    我突然想到有一个操作员可以做到这一点:exhaustMap

    const delays = [100, 2000, 100, 3000];
    const since = Date.now();
    let index = 0;
    
    function mock() {
      return Rx.Observable
        .of("res")
        .do(() => console.log("mock req at ", Date.now() - since, " ms"))
        .delay(delays[index++ % delays.length])
        .do(() => console.log("mock res at ", Date.now() - since, " ms"));
    }
    
    const poll = Rx.Observable
      .timer(0, 1000)
      .do(tick => console.log("tick", tick))
      .exhaustMap(() => mock());
    
    poll.take(delays.length).subscribe(r => console.log(r));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

    【讨论】:

    • 更新了答案以使用exhaustMap - 解决方案很简单。
    • 我可能对实现感到困惑,但我的示例应用程序将挂起的请求排队,而不是等待它们完成,然后再排队下一个请求。 codepen.io/activedecay/pen/EoRNJv
    • 哦!我修好了它。见代码笔。我需要传递一个返回承诺的函数,而不是承诺本身
    • 这里有一篇很棒的关于映射策略的文章,应该有助于理解为什么 outletMap 是这里的正确解决方案。 blog.angular-university.io/rxjs-higher-order-mapping
    【解决方案2】:

    我相信这可以满足您的需求:

    let counter = 0;
    function apiCall() {
      const delay = Math.random() * 1000;
      const count = ++counter;
      return Rx.Observable.timer(delay).mapTo(count);
    }
    
    Rx.Observable.timer(0, 1000)
      .mergeMap(() => apiCall())
      .take(1)
      .repeat()
      .subscribe(x => { console.log(x); });
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
    • timer(0, 1000):立即发射,之后每隔一秒发射一次
    • mergeMap(...):切换到 api 调用返回的 observable。这将在每次重试时生成一个新的 observable。如果您不想在每次重试时创建一个新的,请将其替换为 mergeMapTo(apiCall())
    • take(1):强制订阅完成,因此一旦 api 发出,计时器就不会触发
    • repeat():在 api 发出时重新开始序列

    所以调用将立即调用 api。如果它在一秒钟内没有返回,则每秒将进行另一个调用。一旦有来自 api 调用之一的响应,计时器将被取消,整个序列重新开始。这不会取消我认为符合您意图的正在进行的请求。

    编辑:如果后一个请求在前一个请求之前返回,那么前一个请求将被丢弃。

    【讨论】:

    • @activedecay 抱歉,我意识到我之前的扩展有点偏离。因此,澄清take(1) 将完成其背后的蒸汽,这将杀死timer(0, 1000)(主要目标)和从API调用返回的可观察对象(如果尚未完成)。然后重播将重新开始 Steam 并阻止订阅完成。
    • 我发现了一个问题,将随机超时更改为 5 秒时,似乎有多个 api 命中未记录在您的解决方案中。 codepen.io/activedecay/pen/WdJMPv?editors=1011 在我的实际实现中,它似乎复制了我在代码笔测试中发现的内容;当另一个请求排队时,未完成的请求仍在等待处理。
    • 这个方案绝对比我的好!但这里有一件重要的事情需要注意:背压。如果出于任何原因您的 API 调用花费超过 1 秒,它将开始产生背压(计时器将继续运行)。如果您进行轮询,我想这是因为您不想长时间刷新页面,这最终可能会成为问题。再一次,我现在没有更好的主意,这是迄今为止最好的选择;)
    • 我没有看到你的评论@activedecay,但这就是我在说的......这是一个棘手/有趣的问题,我希望能提供最佳答案:)
    • @activedecay 如果后面的请求在前一个请求之前返回,那么前一个请求会被丢弃。也许您可以考虑这是一个功能;)无论如何,我同意背压。如果一个请求很慢,那么在大多数情况下,我预计下一个请求也会很慢。您可能希望将计时器从 1 秒提高到更高的时间 (15) 以避免背压但仍能处理异常情况。或者使用 websockets ;)
    【解决方案3】:

    在我想出一个仅基于 rxjs 且没有副作用(没有变量分配)且没有背压的答案之前,我确实必须考虑 1500 万!

    const { Observable } = Rx;
    
    const mockHttpRequest = url =>
      Observable
        .of('ok')
        .do(x => console.log('fetching...'))
        .delay(250);
    
    const poll = (httpRequest$, ms) => {
      const tick$ = Observable.timer(ms);
    
      return Observable
        .zip(httpRequest$, tick$)
        .repeat()
        .map(([httpResult]) => httpResult);
    };
    
    poll(mockHttpRequest('your-url-here'), 1000)
      .do(console.log)
      .subscribe();
    

    这是一个正常工作的 Plunkr:https://plnkr.co/edit/sZTjLedNCE64bgLNhnaS?p=preview

    【讨论】:

    • 由于.zip 至少等待所有可观察对象发射一次,即使http 请求在不到1 秒内完成,它也会等待tick$ 发射(延迟1 秒)。看起来这最多每秒发射一次。
    • 如果 bygrace 是正确的,这将不是我提出的问题的解决方案,因为如果可以的话,最好尽可能快的响应。
    • 这可能是我最终选择的一个,因为它的用途与我的 plain-js 解决方案几乎完全相同,而且行数更少。背压问题不容忽视——如果服务器响应时间过长,我不想排队另一个请求。这对我来说比等待不到 1 秒来获取下一个数据点更重要。干杯队友!
    • 不是模拟请求,您可以将答案更改为会影响资源的实际实现,最好使用承诺?我似乎无法做到这一点。 codepen.io/activedecay/pen/ppVLBL
    • 你可以用任何你想要的替换mockHttpRequest('your-url-here'),即使你返回一个promise rxjs会自己处理它;)例如使用角度你会放http.get('your-url-here')
    猜你喜欢
    • 1970-01-01
    • 2020-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多