【问题标题】:RXJS Observable stretchRXJS 可观察拉伸
【发布时间】:2017-07-16 19:32:28
【问题描述】:

我有一个 Rx.Observable.webSocket 主题。我的服务器端点无法处理同时接收的消息(

我创建了另一个主题 requestSubject 并订阅了它。 然后在订阅中调用 websocket 的 next 。

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

使用延迟将每个下一个调用移动相同的延迟时间,然后所有下一个调用稍后发出相同的时间......这不是我想要的。

我试过delaythrottledebounce,但不合适。

以下应该说明我的问题

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-

【问题讨论】:

  • 在您收到的每个值上,您可以映射该值并仅针对该值返回一个延迟的 observable,例如 myStream.flatMap(val => if(whatever reason){return Rx.Observable.of(val ).delay(10)} else { return Observable.of(val) ...

标签: javascript rxjs reactive-programming


【解决方案1】:

不得不修改一下,它并不像看起来那么容易:

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

基本上,我们需要计算发射之间所需的延迟,以便将它们间隔开。我们使用timestamp() 的原始排放和mergeMap 的并发量为 1 的重载来做到这一点,以便仅在发出前一个延迟值时订阅下一个延迟值。这是一种没有其他副作用的纯 Rx 解决方案。

【讨论】:

  • 这是一个很好的解决方案。完美运行
【解决方案2】:

这里有两个使用自定义流且仅使用 rxjs-operators 的解决方案 - 因为它看起来很复杂,我建议您使用此解决方案,但使用自定义流(参见第一个示例下面):

自定义流(更易于阅读和维护,可能还具有更好的性能):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

const spreadDelay = 1000;
let prevEmitTime = 0;

click$
  .concatMap(i => {  // in this case you could also use "flatMap" or "mergeMap" instead of "concatMap"
    const now = Date.now();
    if (now - prevEmitTime > spreadDelay) {
      prevEmitTime = now;
      return Rx.Observable.of(i); // emit immediately
    } else {
      prevEmitTime += spreadDelay;
      return Rx.Observable.of(i).delay(prevEmitTime - now); // emit somewhere in the future
    }
  })
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>

仅使用 RxJS 运算符(包含问题,可能不应该使用):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

click$
  // window will create a new substream whenever no click happened for 1001ms (with the spread out 
  .window(click$
    .concatMap(i => Rx.Observable.of(i).delay(1000))
    .debounceTime(1001)
  )
  .mergeMap(win$ => Rx.Observable.merge(
    win$.take(1).merge(), // emitting the "first" click immediately
    win$.skip(1)
      .merge()
      .concatMap(i => Rx.Observable.of(i).delay(1000)) // each emission after the "first" one will be spread out to 1 seconds
  ))
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>

【讨论】:

  • 我考虑过使用窗口,但不是总是需要等待窗口发出吗?
  • 窗口解决方案不是防弹的,是的 - 而且也很复杂......最好忽略它:-)
【解决方案3】:

Mark van Straten 的解决方案对我来说并不完全准确。我从here 中找到了一个更简单、更准确的解决方案。

const source = from([100,500,1500,1501,1502,1503]).pipe(
    mergeMap(i => of(i).pipe(delay(i)))
);

const delayMs = 500;
const stretchedSource = source.pipe(
  concatMap(e => concat(of(e), EMPTY.pipe(delay(delayMs))))
);

【讨论】:

    猜你喜欢
    • 2019-10-18
    • 2016-05-17
    • 2017-06-11
    • 2016-10-29
    • 1970-01-01
    • 1970-01-01
    • 2017-10-15
    • 2021-08-29
    • 1970-01-01
    相关资源
    最近更新 更多