【问题标题】:Create a Observable that delays the next value创建一个延迟下一个值的 Observable
【发布时间】:2026-01-22 15:45:01
【问题描述】:

我正在尝试使用 RxJS 创建一个可观察的对象来执行图片中的操作。

  • 获取一个值并等待一段固定的时间,然后再获取 下一个。
  • 下一个将是在周期内发出的最后一个值 等等,跳过其余的。
  • 如果等待间隔过去没有发出任何值,则下一个 应该立即抓取作为图像的最后一个示例 描绘。

【问题讨论】:

  • 感谢您制作大理石图。

标签: observable reactive-extensions-js rxjs


【解决方案1】:

这应该可以解决问题。

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);

【讨论】:

  • 很好的解决方案。完美运行。是否也可以将其转换为 LINQ 运算符?所以我们会有这样的东西:source.delayNext(other).subscribe() 这样我们就可以使用other 值来动态选择延迟下一项的时间。
  • 当然。您应该查看 Rx 源并了解如何执行此操作。 :)
【解决方案2】:

这是 Christopher 的解决方案,修改为运算符。

throttleImmediate 运算符仅存储来自源的最新值,直到给定选择器完成。它会在每次完成后立即触发缓存值(如果存在)。 它最适合在选择器有副作用(例如动画)时使用。

var Rx  = require('rx'),
source  = Rx.Observable.interval(10).take(500),
log     = console.log.bind(console);

Rx.Observable.prototype.throttleImmediate = function (selector) {
    var source = this;

    return Rx.Observable.create(function (observer) {

        var delaying = false,
            hasValue = false,
            complete = false,
            value;

        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }

        function sendValue () {
          delaying = true;
          selector(value).subscribe(
            observer.onNext.bind(observer),
            observer.onError.bind(observer),
            function(){
              if (hasValue) {
                hasValue = false;
                sendValue();
              } else {
                delaying = false;
              }
            }
          );
        }

        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      });
};

source
  .throttleImmediate(function(data){
    var delay;

    if(data%2==0)
      delay=500;
    else
      delay=1000;

    return Rx.Observable.timer(delay).map(function(){ return data; });
  })
  .subscribe(log)

这在向只有选择器知道要延迟的值的源施加压力时派上用场。

示例: 鉴于问题的大理石图。

假设第一个来源是带有要显示的 html 数据的 ajax 调用,ajaxPages 源自对导航栏的点击。 我们希望将它们与入口动画一起渲染,animatePage,其持续时间是动态的。

ajaxPages.throttleImmediate(animatePage).subscribe();

在这里,我们使用来自源的值对页面进行动画处理,跳过动画期间发出的所有值,除了最新的值。

在实践中,我们得到的是一个忽略点击的流,这些点击紧随其后的是其他点击,并且对于向用户显示毫无用处,因为它们会动画化,然后立即动画化。

【讨论】:

  • until 后修复通常与自动处置 (takeUntil) 结合使用。我建议使用除此之外的其他东西,因为delayUntil 听起来您想延迟一个值,直到“animatePage”可观察到的产量。实际上,操作员可以更准确地描述为throttle。但是,Rx 中已经存在同名的运算符。那个操作符真的应该被命名为debounce,但此时我们真的无能为力。因此,我会选择thottleImmediate
  • throttle 运算符是一个悲伤的故事,但现在重命名它已经用得太多了...:/