【问题标题】:How to throttle a server request with Rxjs 5?如何使用 Rxjs 5 限制服务器请求?
【发布时间】:2017-11-02 17:17:33
【问题描述】:

我想创建一个函数以在服务器中发出 HTTP PUT 请求,但如果在该时间间隔内多次调用该函数,则每 500 毫秒使用最后一次调用参数发送一次,如果仍在进行中,则取消最后一次请求。

我研究并想出了这个解决方案:

const { Observable } = require('rxjs/Observable')
const { Subject } = require('rxjs/Subject')
const { switchMap, auditTime } = require('rxjs/operators')

// Simulate HTTP request
function makeRequest (val) {
  return Observable.create(observer => {
    console.log('Request:', val);
    observer.next(val);
    observer.complete();
  });
}

const toUpdateStream = new Subject();
const notifier$ = toUpdateStream.pipe(
  auditTime(500),
  switchMap(val => makeRequest(val))
);


function updateThrottle (val) {
  return Observable.create(observer => {
    const lastUpdate$ = notifier$.subscribe(res => {
      observer.next(res);
      observer.complete();
      lastUpdate$.unsubscribe();
    });
    toUpdateStream.next(val);
  });
}

// Try to update 3 times with different parameters
updateThrottle(10).subscribe(val => { console.log('1:', val); });
updateThrottle(20).subscribe(val => { console.log('2:', val); });
updateThrottle(30).subscribe(val => { console.log('3:', val); });

输出是:

Request: 30
1: 30
Request: 30
2: 30
Request: 30
3: 30

问题是我只需要用 30 调用一次请求,而不是每次。

我能做什么?

【问题讨论】:

    标签: javascript rxjs rxjs5


    【解决方案1】:

    因此,您只想将值传递给auditTime,前提是它们已从先前的发射中改变。 pairwise 运算符非常适合这个用例。

    const notifier$ = toUpdateStream.pipe(
      startWith(null),
      pairwise(), // Emit the previous and current search options
      filter(([oldSearch, newSearch]) => oldSearch !== newSearch),
      map(([oldSearch, newSearch]) => newSearch),
      auditTime(500),
      switchMap(val => makeRequest(val))
    );
    

    你也可以使用bufferCount(2, 1),而不是pairwise(),它会做同样的事情。

    【讨论】:

    • 进行更改后,请求现在发出 2 次。这个想法是每 500 毫秒只发出一次请求,如果它被多次调用,即使它是相同的。你知道怎么做吗?谢谢!
    • 我明白了,您需要使用来自startWith(null) 的默认发射“初始化”pariwise(),请参阅我的更新。
    【解决方案2】:

    在尝试了一些东西后,我最终得到了这个最终解决方案:

    const { Observable } = require('rxjs/Observable')
    const { Subject } = require('rxjs/Subject')
    const { auditTime, switchMap } = require('rxjs/operators')
    
    // Simulate log with timestamp
    const log = msg => {
      const d = new Date();
      console.log(d.getSeconds() +'.'+ d.getMilliseconds() +' - '+ msg);
    }
    
    // Simulate HTTP request that takes 750ms
    function makeRequest (val) {
      return Observable.create(observer => {
        const timeout = setTimeout(() => {
          log('Request: '+ val);
          observer.next('R'+ val);  // Mock the HTTP response
          observer.complete();
        }, 750);
        return () => clearTimeout(timeout);
      });
    }
    
    const toUpdateStream$ = new Subject();
    const updatedStream$ = new Subject();
    const filter$ = toUpdateStream$.pipe(
      auditTime(500),
      switchMap(val => makeRequest(val))
    );
    filter$.subscribe(val => updatedStream$.next(val));
    
    function updateThrottle (val) {
      return Observable.create(observer => {
        const lastUpdate = updatedStream$.subscribe(res => {
          observer.next(res);
          observer.complete();
          lastUpdate.unsubscribe();
        });
        toUpdateStream$.next(val);
      });
    }
    
    log('Start');
    
    // Try 3 requests and the last one (the 30) gets processed
    updateThrottle(10).subscribe(val => log('1: '+ val));
    updateThrottle(20).subscribe(val => log('2: '+ val));
    updateThrottle(30).subscribe(val => log('3: '+ val));
    
    // Try to make more requests when the current one isn't finished
    setTimeout(() => {
      // This one cancels the last one
      updateThrottle(40).subscribe(val => log('4: '+ val));
      updateThrottle(50).subscribe(val => log('5: '+ val));
      updateThrottle(60).subscribe(val => log('6: '+ val)); // This gets processed
    }, 600);
    

    还有日志:

    43.394 - Start
    45.275 - Request: 60
    45.277 - 1: R60
    45.277 - 2: R60
    45.278 - 3: R60
    45.278 - 4: R60
    45.278 - 5: R60
    45.278 - 6: R60
    

    它应该只使用最后一个参数发出一个请求并将其返回给侦听器。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-15
      • 1970-01-01
      • 2017-06-13
      • 1970-01-01
      • 2022-01-17
      相关资源
      最近更新 更多