【问题标题】:Is there a way to loop an Observable and use the previous iteration to influence the next?有没有办法循环一个 Observable 并使用前一个迭代来影响下一个迭代?
【发布时间】:2021-04-09 08:48:11
【问题描述】:

我是 rxjs 的新手,想用它来构建视频下载器。其目的是 24/7 全天候运行并自动录制偶尔的直播以供日后观看。这是我目前所拥有的。

import { BehaviorSubject, from, defer, of } from "rxjs";
import { delay, mergeMap, repeat, tap } from "rxjs/operators";

const downloader = url => {
  const defaultDelay = 1000;
  const maxDelay = 10000;
  const delayTime = new BehaviorSubject(defaultDelay);

  /*
   * Simulated download output.
   * 
   * @return {String|Number} potentialOutput 
   *         A {Number} 1 means "FAILURE, stream is offline."
   *         A {String} means "SUCCESS, video was downloaded."
   *         1 is the most likely value returned
   * 
   * greets https://stackoverflow.com/a/8877271/1004931
   */
  function randomWithProbability() {
    var potentialOutput = [1, 1, 1, 1, 1, "/tmp/video.mp4"];
    var idx = Math.floor(Math.random() * potentialOutput.length);
    return potentialOutput[idx];
  }

  /**
   * Simulated download. Returns a promise which resolves after 1 second.
   */
  const download = url => {
    let downloadP = new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve(randomWithProbability());
      }, 1000);
    });
    return from(downloadP);
  };

  /**
   * Conditionally adjust the delay inbetween download attempts.
   *   - If the video downloaded successfuly, reset the timer to it's default.
   *     (in case the stream went down by error, we want to record again ASAP.)
   *   - If the video stream was offline, increase the delay until our next download attempt.
   *     (we don't want to be rude and flood the server)
   */
  const adjustTimer = (ytdlOutput) => {
    if (typeof ytdlOutput === 'string') {
      delayTime.next(defaultDelay); // video stream exited successfully, so reset in case the stream starts again
    } else {
      let adjustedTime = (delayTime.getValue() * 2 > maxDelay) ? maxDelay : delayTime.getValue() * 2;
      delayTime.next(adjustedTime); // video stream exited abnormally, likely due to being offline. wait longer until next attempt
    }
  };

  /**
   * The Observable.
   *   1. Start with the URL of the video stream
   *   2. delay by the time defined in delayTime
   *   3. download, merging the download observable with the parent observable.
   *   4. adjust the delayTime based on download output.
   *   5. repeat the process indefinitely.
   */
  const stream = of(url)
    .pipe(
      delay(delayTime.getValue()),
      mergeMap(download),
      tap(res => {
        adjustTimer(res);
      }),
      repeat()
    )
    
  stream.subscribe(val => {
    console.log(
      `download result:${val}, delayTime:${delayTime.getValue()}`
    );
  });
};

downloader("https://example.com/files/video.mp4");

(Stackblitz)

我遇到的问题是 {BehaviorSubject} delayTime 在我的循环的每次迭代中都没有得到更新。 delayTime is 正在更新,如在订阅者的回调中调用 delayTime.getValue() 所示,但更改对 observable/subscriber(?) 的内存(?) 没有影响.

相反,我看到可观察对象的范围(?)中的 delayTime 保持不变,就像它第一次订阅时一样。在 observable 的世界中,没有像我希望的那样更新 BehaviorSubject 的值。

这就是我卡住的地方。如何重构我的代码以具有随时间变化的延迟计时器,并影响到下一次下载尝试的延迟?

【问题讨论】:

    标签: javascript loops rxjs observable reactive-programming


    【解决方案1】:

    暂时忽略 rxjs,假装你不知道这些函数是什么意思看这段代码:

      const stream = of(url)
        .pipe(
          delay(delayTime.getValue()),
          mergeMap(download),
          tap(res => {
            adjustTimer(res);
          }),
          repeat()
        )
    

    一个匿名的简单版本是

    someFunc(delayTime.getValue())
    

    这里的问题是 delayTime.getValue() 被直接评估,而不是在 someFunc 运行时。上面的代码也是如此:评估发生在创建 stream 变量时,而不是每次“迭代”(更好的词:发射)。


    延迟运算符仅适用于固定延迟。出于您的目的,您想使用delayWhen,它会针对每次发射进行评估:

    delayWhen(() => timer(delayTime.getValue())
    

    但是请注意,我们需要返回一个可观察的通知器,而不是所需的毫秒延迟。


    最后一点,访问getValue 是一个危险信号,因为它没有正确使用可观察对象。这也是我们实际上不使用delayWhen 中提供给回调的参数的原因。您的代码可以进行重构以使其具有适当的反应性,但这超出了这里的范围。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-15
      • 1970-01-01
      • 2015-08-27
      • 1970-01-01
      • 2018-02-28
      • 2019-01-23
      • 2011-02-15
      • 1970-01-01
      相关资源
      最近更新 更多