【问题标题】:RxJS throttle behavior; get first value immediatelyRxJS 油门行为;立即获得第一个值
【发布时间】:2016-09-05 21:03:41
【问题描述】:

Plunkr 示例:https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

我知道 RxJS (5.0 beta.4) 有这 3 种节流方法:

auditTime()throttleTime()debounceTime()

我正在寻找的行为是 lodash 默认在 throttle 上执行的行为:

    1. 立即给我第一个值!
    1. 在连续值上,保持给定延迟的值,然后发出最后出现的值
    1. 当油门延迟到期时,返回状态 (1)

理论上应该是这样的:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

但是

  • throttleTime 永远不会给我最后一个值,如果在油门超时发出的话
  • debounceTime 不会立即触发
  • auditTime 不会立即触发

我可以结合任何 RxJS 方法来实现所描述的行为吗?

【问题讨论】:

    标签: javascript rxjs throttling


    【解决方案1】:

    对于 2018 年之后寻找此内容的任何人:此内容已在一年多前添加,但由于某种原因,文档尚未更新。

    RxJS commit

    您可以简单地将配置对象传递给throttleTime。默认值为{ leading: true, trailing: false }。要实现此处讨论的行为,您只需将trailing 设置为true{ leading: true, trailing: true }

    编辑:

    为了完整起见,这里是一个有效的 sn-p:

    import { asyncScheduler } from 'rxjs'
    import { throttleTime } from 'rxjs/operators'
    
    ...
    
    observable.pipe(
      throttleTime(100, asyncScheduler, { leading: true, trailing: true })
    )
    

    【讨论】:

    • 值得注意的是,如果您想根据时间以外的其他因素进行节流(例如 ajax 完成) - 您可以使用 throttle(() => Promise.resolve(...), { leading: true, trailing:true }) - 我发现这对于立即运行 ajax puts、等待完成然后运行特别有用当第一个完成时,最后一个放在一个系列中。
    • 这是否意味着 auditTime 已经过时了?
    【解决方案2】:

    对于较旧的 RxJ,我编写了一个 concatLatest 运算符,它可以满足您的大部分需求。有了它,您可以使用以下代码获得节流行为:

    const delay = Rx.Observable.empty().delay(500);
    inputObservable
        .map(value => Rx.Observable.of(value).concat(delay))
        .concatLatest()
        .subscribe(...);
    

    这里是运算符。我尝试更新它以使用 RxJS5:

    Rx.Observable.prototype.concatLatest = function () {
        /// <summary>
        /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
        /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
        /// away and only the Nth will be observed.
        /// </summary>
        /// <returns type="Rx.Observable"></returns>
        var source = this;
    
        return Rx.Observable.create(function (observer) {
            var latest,
                isStopped,
                isBusy,
                outerSubscription,
                innerSubscription,
                subscriptions = new Rx.Subscription(function () {
                  if (outerSubscription) {
                    outerSubscription.unsubscribe();
                  }
                  if (innerSubscription) {
                    innerSubscription.unsubscribe();
                  }
                }),
                onError = observer.error.bind(observer),
                onNext = observer.next.bind(observer),
                innerOnComplete = function () {
                    var inner = latest;
                    if (inner) {
                        latest = undefined;
                        if (innerSubscription) {
                          innerSubscription.unsubscribe();
                        }
                        innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                    }
                    else {
                        isBusy = false;
                        if (isStopped) {
                            observer.complete();
                        }
                    }
                };
    
            outerSubscription = source.subscribe(function (newInner) {
                if (isBusy) {
                    latest = newInner;
                }
                else {
                    isBusy = true;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
                }
            }, onError, function () {
                isStopped = true;
                if (!isBusy) {
                    observer.complete();
                }
            });
    
            return subscriptions;
        });
    };
    

    这是一个更新的 plunkr:https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

    请注意,我已将您的 lodash 版本更新为最新版本。在 lodash 4.7 中,我重写了油门/去抖动操作符以修复一些边缘情况错误。您使用的 4.6.1 仍然存在一些错误,但我认为它们不会影响您的测试。

    【讨论】:

    • 嗨@Brandon,很好的答案。您能否使用 RxJS 4.x 提供带有 concatLatest 运算符的 plunkr 链接? (我不会问,因为这个问题是针对 RxJS 5 的,但你说你已经很方便了 :)
    • @HipsterZipster 我为 RxJS 2 编写了这个版本。我认为它也可以在 4 中工作:gist.github.com/bman654/92749cd93cdd84a540e41403f25a2105
    【解决方案3】:

    我采用了 auditTime 运算符并更改了 2 行以实现所需的行为。

    新插件:https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

    原文:

    变化:

    来自(审计时间):

    protected _next(value: T): void {
      this.value = value;
      this.hasValue = true;
      if (!this.throttled) {
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
      }
    }
    
    clearThrottle(): void {
      const { value, hasValue, throttled } = this;
      if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
      }
      if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
      }
    }
    

    到(auditTimeImmediate):

    protected _next(value: T): void {
        this.value = value;
        this.hasValue = true;
        if (!this.throttled) {
            // change 1:
            this.clearThrottle();
        }
    }
    
    clearThrottle(): void {
        const { value, hasValue, throttled } = this;
        if (throttled) {
            this.remove(throttled);
            this.throttled = null;
            throttled.unsubscribe();
        }
        if (hasValue) {
            this.value = null;
            this.hasValue = false;
            this.destination.next(value);
            // change 2:
            this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
        }
    }
    

    所以我在值为nexted 之后开始超时。

    用法:

    inputObservable
      .do(() => cancelPreviousRequest())
      .auditTimeImmediate(500)
      .subscribe((value) => doNextRequest(value))
    

    【讨论】:

    • 有没有办法在最新版本中做到这一点,而无需更改 rxjs 源?
    猜你喜欢
    • 2012-08-10
    • 2021-04-26
    • 1970-01-01
    • 2019-10-04
    • 2017-08-26
    • 1970-01-01
    • 1970-01-01
    • 2012-05-15
    • 2018-12-15
    相关资源
    最近更新 更多